use std::collections::{BTreeSet, HashMap};
use std::sync::Arc;
use crate::http_client;
use crate::text_safety::sanitize_for_terminal;
use super::config::{ApiVersion, ElectricityMapsConfig, EmissionFactorType, TemporalGranularity};
use super::state::{ElectricityMapsState, IntensityReading, monotonic_ms};
const MAX_API_BODY_BYTES: usize = 1024 * 1024;
const REQUEST_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
const FAILURE_THRESHOLD: u32 = 3;
const MAX_ESTIMATION_METHOD_LEN: usize = 64;
#[derive(Debug, thiserror::Error)]
enum EmapsScraperError {
#[error("invalid API URI: {0}")]
InvalidUri(String),
#[error("HTTP transport error")]
Transport(#[source] hyper_util::client::legacy::Error),
#[error("API body read failed: {0}")]
BodyRead(String),
#[error("API returned HTTP {0}")]
HttpStatus(u16),
#[error("API request timed out (5s)")]
Timeout,
#[error("JSON parse error: {0}")]
JsonParse(String),
}
#[derive(serde::Deserialize)]
struct CarbonIntensityResponse {
#[serde(rename = "carbonIntensity")]
carbon_intensity: f64,
#[serde(default, rename = "isEstimated")]
is_estimated: Option<bool>,
#[serde(default, rename = "estimationMethod")]
estimation_method: Option<String>,
}
#[derive(Debug)]
struct FetchedReading {
gco2_per_kwh: f64,
is_estimated: Option<bool>,
estimation_method: Option<String>,
}
#[must_use]
pub fn spawn_electricity_maps_scraper(
config: ElectricityMapsConfig,
state: Arc<ElectricityMapsState>,
) -> tokio::task::JoinHandle<()> {
warn_if_legacy_v3_endpoint(&config.api_endpoint);
tokio::spawn(run_scraper_loop(config, state))
}
fn warn_if_legacy_v3_endpoint(endpoint: &str) {
if ApiVersion::from_endpoint(endpoint) == ApiVersion::V3 {
let safe_endpoint = sanitize_for_terminal(endpoint);
tracing::warn!(
endpoint = %safe_endpoint,
"Electricity Maps endpoint configured with legacy /v3 path. \
v3 is still supported but in legacy mode. Migrate to v4 by \
setting `endpoint = \"https://api.electricitymaps.com/v4\"` \
in your perf-sentinel TOML config. \
See https://app.electricitymaps.com/developer-hub/api/reference"
);
}
}
async fn run_scraper_loop(config: ElectricityMapsConfig, state: Arc<ElectricityMapsState>) {
let client = http_client::build_client();
let mut ticker = tokio::time::interval(config.poll_interval);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
let mut consecutive_failures: u32 = 0;
loop {
ticker.tick().await;
let any_success = run_one_tick(&client, &config, &state).await;
update_failure_counter(&mut consecutive_failures, any_success);
}
}
async fn run_one_tick(
client: &http_client::HttpClient,
config: &ElectricityMapsConfig,
state: &ElectricityMapsState,
) -> bool {
let unique_zones: BTreeSet<&str> = config.region_map.values().map(String::as_str).collect();
if unique_zones.is_empty() {
return false;
}
let zone_readings = fetch_zones(client, config, &unique_zones).await;
if zone_readings.is_empty() {
return false;
}
let now = monotonic_ms();
let mut new_table = state.current_owned();
dispatch_readings(&zone_readings, &config.region_map, now, &mut new_table);
state.publish(new_table);
true
}
async fn fetch_zones<'a>(
client: &http_client::HttpClient,
config: &ElectricityMapsConfig,
unique_zones: &BTreeSet<&'a str>,
) -> HashMap<&'a str, FetchedReading> {
let mut zone_readings: HashMap<&str, FetchedReading> =
HashMap::with_capacity(unique_zones.len());
for zone in unique_zones {
match fetch_intensity(
client,
&config.api_endpoint,
&config.auth_token,
zone,
config.emission_factor_type,
config.temporal_granularity,
)
.await
{
Ok(reading) => {
tracing::debug!(
zone = %zone,
intensity = reading.gco2_per_kwh,
"Electricity Maps: fetched intensity"
);
zone_readings.insert(*zone, reading);
}
Err(e) => {
tracing::debug!(
zone = %zone,
error = %e,
"Electricity Maps: failed to fetch intensity"
);
}
}
}
zone_readings
}
fn dispatch_readings(
zone_readings: &HashMap<&str, FetchedReading>,
region_map: &HashMap<String, String>,
now: u64,
new_table: &mut HashMap<String, IntensityReading>,
) {
for (cloud_region, zone) in region_map {
if let Some(reading) = zone_readings.get(zone.as_str()) {
new_table.insert(
cloud_region.clone(),
IntensityReading {
gco2_per_kwh: reading.gco2_per_kwh,
last_update_ms: now,
is_estimated: reading.is_estimated,
estimation_method: reading.estimation_method.clone(),
},
);
}
}
}
fn update_failure_counter(consecutive_failures: &mut u32, any_success: bool) {
if any_success {
*consecutive_failures = 0;
return;
}
*consecutive_failures = consecutive_failures.saturating_add(1);
if *consecutive_failures == FAILURE_THRESHOLD {
tracing::warn!(
failures = *consecutive_failures,
"Electricity Maps: {} consecutive failures, \
falling back to embedded profiles",
*consecutive_failures
);
}
}
fn build_request_url(
api_endpoint: &str,
zone: &str,
emission_factor_type: EmissionFactorType,
temporal_granularity: TemporalGranularity,
) -> String {
let mut uri_str = format!("{api_endpoint}/carbon-intensity/latest?zone={zone}");
if emission_factor_type != EmissionFactorType::default() {
uri_str.push_str("&emissionFactorType=");
uri_str.push_str(emission_factor_type.as_query_value());
}
if temporal_granularity != TemporalGranularity::default() {
uri_str.push_str("&temporalGranularity=");
uri_str.push_str(temporal_granularity.as_query_value());
}
uri_str
}
async fn fetch_intensity(
client: &http_client::HttpClient,
api_endpoint: &str,
auth_token: &str,
zone: &str,
emission_factor_type: EmissionFactorType,
temporal_granularity: TemporalGranularity,
) -> Result<FetchedReading, EmapsScraperError> {
let uri_str = build_request_url(
api_endpoint,
zone,
emission_factor_type,
temporal_granularity,
);
let uri: hyper::Uri = uri_str
.parse()
.map_err(|e: hyper::http::uri::InvalidUri| EmapsScraperError::InvalidUri(e.to_string()))?;
let req = hyper::Request::builder()
.method(hyper::Method::GET)
.uri(&uri)
.header("auth-token", auth_token)
.header("User-Agent", "perf-sentinel")
.body(http_body_util::Empty::<bytes::Bytes>::new())
.map_err(|e| EmapsScraperError::BodyRead(e.to_string()))?;
let resp = tokio::time::timeout(REQUEST_TIMEOUT, client.request(req))
.await
.map_err(|_| EmapsScraperError::Timeout)?
.map_err(EmapsScraperError::Transport)?;
let status = resp.status().as_u16();
if status != 200 {
tracing::debug!(
status,
endpoint = %http_client::redact_endpoint(&uri),
"Electricity Maps: non-200 response"
);
return Err(EmapsScraperError::HttpStatus(status));
}
let limited = http_body_util::Limited::new(resp.into_body(), MAX_API_BODY_BYTES);
let body = http_body_util::BodyExt::collect(limited)
.await
.map_err(|e| EmapsScraperError::BodyRead(e.to_string()))?
.to_bytes();
let text =
std::str::from_utf8(&body).map_err(|e| EmapsScraperError::BodyRead(e.to_string()))?;
let response: CarbonIntensityResponse =
serde_json::from_str(text).map_err(|e| EmapsScraperError::JsonParse(e.to_string()))?;
if !response.carbon_intensity.is_finite() || response.carbon_intensity < 0.0 {
return Err(EmapsScraperError::JsonParse(format!(
"invalid carbon intensity value: {}",
response.carbon_intensity
)));
}
Ok(FetchedReading {
gco2_per_kwh: response.carbon_intensity,
is_estimated: response.is_estimated,
estimation_method: response
.estimation_method
.and_then(sanitize_estimation_method),
})
}
fn sanitize_estimation_method(s: String) -> Option<String> {
if s.len() > MAX_ESTIMATION_METHOD_LEN {
return None;
}
if s.chars().any(char::is_control) {
return None;
}
Some(s)
}
#[cfg(test)]
mod tests {
use super::*;
use core::assert_matches;
async fn fetch_intensity_test(
client: &http_client::HttpClient,
api_endpoint: &str,
auth_token: &str,
zone: &str,
) -> Result<FetchedReading, EmapsScraperError> {
fetch_intensity(
client,
api_endpoint,
auth_token,
zone,
EmissionFactorType::default(),
TemporalGranularity::default(),
)
.await
}
#[test]
fn build_request_url_omits_query_params_when_defaults_used() {
let url = build_request_url(
"https://api.electricitymaps.com/v4",
"FR",
EmissionFactorType::Lifecycle,
TemporalGranularity::Hourly,
);
assert_eq!(
url,
"https://api.electricitymaps.com/v4/carbon-intensity/latest?zone=FR"
);
}
#[test]
fn build_request_url_appends_emission_factor_type_when_direct() {
let url = build_request_url(
"https://api.electricitymaps.com/v4",
"FR",
EmissionFactorType::Direct,
TemporalGranularity::Hourly,
);
assert_eq!(
url,
"https://api.electricitymaps.com/v4/carbon-intensity/latest?zone=FR&emissionFactorType=direct"
);
}
#[test]
fn build_request_url_appends_temporal_granularity_when_sub_hour() {
let url = build_request_url(
"https://api.electricitymaps.com/v4",
"FR",
EmissionFactorType::Lifecycle,
TemporalGranularity::FiveMinutes,
);
assert_eq!(
url,
"https://api.electricitymaps.com/v4/carbon-intensity/latest?zone=FR&temporalGranularity=5_minutes"
);
}
#[test]
fn build_request_url_appends_both_knobs_when_both_non_default() {
let url = build_request_url(
"https://api.electricitymaps.com/v4",
"DE",
EmissionFactorType::Direct,
TemporalGranularity::FifteenMinutes,
);
assert_eq!(
url,
"https://api.electricitymaps.com/v4/carbon-intensity/latest?zone=DE&emissionFactorType=direct&temporalGranularity=15_minutes"
);
}
#[test]
fn parse_valid_response() {
let json = r#"{"zone":"FR","carbonIntensity":56.0,"datetime":"2025-01-01T12:00:00Z"}"#;
let resp: CarbonIntensityResponse = serde_json::from_str(json).unwrap();
assert!((resp.carbon_intensity - 56.0).abs() < 1e-10);
}
#[test]
fn parse_response_missing_field() {
let json = r#"{"zone":"FR"}"#;
let result: Result<CarbonIntensityResponse, _> = serde_json::from_str(json);
assert!(result.is_err());
}
use crate::test_helpers::{http_200_text, http_status, spawn_one_shot_server};
fn http_200(body: &str) -> Vec<u8> {
http_200_text("application/json", body)
}
#[tokio::test]
async fn fetch_intensity_success_happy_path() {
let body = r#"{"zone":"FR","carbonIntensity":56.0,"datetime":"2025-01-01T12:00:00Z"}"#;
let (endpoint, server) = spawn_one_shot_server(http_200(body)).await;
let client = http_client::build_client();
let reading = fetch_intensity_test(&client, &endpoint, "test-token", "FR")
.await
.expect("200 + valid JSON should succeed");
assert!((reading.gco2_per_kwh - 56.0).abs() < 1e-10);
assert_eq!(reading.is_estimated, None);
assert_eq!(reading.estimation_method, None);
server.await.unwrap();
}
#[tokio::test]
async fn fetch_intensity_parses_estimation_metadata_when_present() {
let body = r#"{"zone":"DE","carbonIntensity":380.0,"isEstimated":true,"estimationMethod":"TIME_SLICER_AVERAGE"}"#;
let (endpoint, server) = spawn_one_shot_server(http_200(body)).await;
let client = http_client::build_client();
let reading = fetch_intensity_test(&client, &endpoint, "tok", "DE")
.await
.expect("200 + valid JSON should succeed");
assert_eq!(reading.is_estimated, Some(true));
assert_eq!(
reading.estimation_method.as_deref(),
Some("TIME_SLICER_AVERAGE")
);
server.await.unwrap();
}
#[tokio::test]
async fn fetch_intensity_handles_explicit_measured_flag() {
let body = r#"{"zone":"FR","carbonIntensity":56.0,"isEstimated":false}"#;
let (endpoint, server) = spawn_one_shot_server(http_200(body)).await;
let client = http_client::build_client();
let reading = fetch_intensity_test(&client, &endpoint, "tok", "FR")
.await
.expect("200 + valid JSON should succeed");
assert_eq!(reading.is_estimated, Some(false));
assert_eq!(reading.estimation_method, None);
server.await.unwrap();
}
#[test]
fn sanitize_estimation_method_drops_oversized_strings() {
let too_long = "X".repeat(MAX_ESTIMATION_METHOD_LEN + 1);
assert_eq!(sanitize_estimation_method(too_long), None);
}
#[test]
fn sanitize_estimation_method_drops_control_characters() {
assert_eq!(
sanitize_estimation_method("FOO\nBAR".to_string()),
None,
"newline must be rejected to prevent log forging"
);
assert_eq!(
sanitize_estimation_method("FOO\x1b[31mBAR".to_string()),
None,
"ANSI escape must be rejected"
);
}
#[test]
fn sanitize_estimation_method_preserves_realistic_values() {
for v in [
"TIME_SLICER_AVERAGE",
"GENERAL_PURPOSE_ZONE_DEVELOPMENT",
"FUTURE_ALGO_42",
] {
assert_eq!(
sanitize_estimation_method(v.to_string()).as_deref(),
Some(v)
);
}
}
#[tokio::test]
async fn fetch_intensity_drops_oversized_estimation_method() {
let big = "X".repeat(MAX_ESTIMATION_METHOD_LEN + 10);
let body = format!(
r#"{{"zone":"FR","carbonIntensity":56.0,"isEstimated":true,"estimationMethod":"{big}"}}"#
);
let (endpoint, server) = spawn_one_shot_server(http_200(&body)).await;
let client = http_client::build_client();
let reading = fetch_intensity_test(&client, &endpoint, "tok", "FR")
.await
.expect("oversized method must be sanitized, not rejected");
assert_eq!(reading.is_estimated, Some(true));
assert_eq!(reading.estimation_method, None);
server.await.unwrap();
}
#[tokio::test]
async fn fetch_intensity_v3_and_v4_responses_parse_identically() {
let body = r#"{"zone":"FR","carbonIntensity":56.0,"isEstimated":true,"estimationMethod":"TIME_SLICER_AVERAGE","datetime":"2026-04-27T12:00:00Z"}"#;
let (v3_endpoint, v3_server) = spawn_one_shot_server(http_200(body)).await;
let (v4_endpoint, v4_server) = spawn_one_shot_server(http_200(body)).await;
let client = http_client::build_client();
let v3_reading = fetch_intensity_test(&client, &v3_endpoint, "tok", "FR")
.await
.expect("v3 mock must succeed");
let v4_reading = fetch_intensity_test(&client, &v4_endpoint, "tok", "FR")
.await
.expect("v4 mock must succeed");
assert!((v3_reading.gco2_per_kwh - v4_reading.gco2_per_kwh).abs() < 1e-10);
assert_eq!(v3_reading.is_estimated, v4_reading.is_estimated);
assert_eq!(v3_reading.estimation_method, v4_reading.estimation_method);
v3_server.await.unwrap();
v4_server.await.unwrap();
}
#[tokio::test]
async fn fetch_intensity_accepts_unknown_estimation_method_string() {
let body = r#"{"zone":"FR","carbonIntensity":56.0,"isEstimated":true,"estimationMethod":"FUTURE_ALGO_42"}"#;
let (endpoint, server) = spawn_one_shot_server(http_200(body)).await;
let client = http_client::build_client();
let reading = fetch_intensity_test(&client, &endpoint, "tok", "FR")
.await
.expect("200 + valid JSON should succeed");
assert_eq!(reading.is_estimated, Some(true));
assert_eq!(reading.estimation_method.as_deref(), Some("FUTURE_ALGO_42"));
server.await.unwrap();
}
#[tokio::test]
async fn fetch_intensity_rejects_non_200_with_http_status_error() {
let (endpoint, server) = spawn_one_shot_server(http_status(401, "Unauthorized")).await;
let client = http_client::build_client();
let err = fetch_intensity_test(&client, &endpoint, "bad-token", "FR")
.await
.expect_err("401 must surface as HttpStatus");
match err {
EmapsScraperError::HttpStatus(401) => {}
other => panic!("expected HttpStatus(401), got {other:?}"),
}
server.await.unwrap();
}
#[tokio::test]
async fn fetch_intensity_rejects_server_error() {
let (endpoint, server) =
spawn_one_shot_server(http_status(503, "Service Unavailable")).await;
let client = http_client::build_client();
let err = fetch_intensity_test(&client, &endpoint, "tok", "FR")
.await
.expect_err("503 must surface as HttpStatus");
match err {
EmapsScraperError::HttpStatus(503) => {}
other => panic!("expected HttpStatus(503), got {other:?}"),
}
server.await.unwrap();
}
#[tokio::test]
async fn fetch_intensity_rejects_malformed_json() {
let (endpoint, server) = spawn_one_shot_server(http_200("not json at all")).await;
let client = http_client::build_client();
let err = fetch_intensity_test(&client, &endpoint, "tok", "FR")
.await
.expect_err("malformed JSON must surface as JsonParse");
assert_matches!(err, EmapsScraperError::JsonParse(_));
server.await.unwrap();
}
#[tokio::test]
async fn fetch_intensity_rejects_json_without_carbon_intensity_field() {
let (endpoint, server) = spawn_one_shot_server(http_200(r#"{"zone":"FR"}"#)).await;
let client = http_client::build_client();
let err = fetch_intensity_test(&client, &endpoint, "tok", "FR")
.await
.expect_err("missing field must surface as JsonParse");
assert_matches!(err, EmapsScraperError::JsonParse(_));
server.await.unwrap();
}
#[tokio::test]
async fn fetch_intensity_rejects_negative_carbon_intensity() {
let body = r#"{"carbonIntensity":-5.0}"#;
let (endpoint, server) = spawn_one_shot_server(http_200(body)).await;
let client = http_client::build_client();
let err = fetch_intensity_test(&client, &endpoint, "tok", "FR")
.await
.expect_err("negative intensity must be rejected");
match err {
EmapsScraperError::JsonParse(msg) => {
assert!(msg.contains("invalid carbon intensity"));
}
other => panic!("expected JsonParse, got {other:?}"),
}
server.await.unwrap();
}
#[tokio::test]
async fn fetch_intensity_rejects_invalid_uri() {
let client = http_client::build_client();
let err = fetch_intensity_test(&client, "not a uri :: bad", "tok", "FR")
.await
.expect_err("invalid URI must surface as InvalidUri");
assert_matches!(err, EmapsScraperError::InvalidUri(_));
}
#[test]
fn emaps_scraper_error_display_messages_are_informative() {
let e1 = EmapsScraperError::InvalidUri("bad".to_string());
let e2 = EmapsScraperError::BodyRead("oops".to_string());
let e3 = EmapsScraperError::HttpStatus(429);
let e4 = EmapsScraperError::Timeout;
let e5 = EmapsScraperError::JsonParse("nope".to_string());
assert!(format!("{e1}").contains("invalid API URI"));
assert!(format!("{e2}").contains("body read"));
assert!(format!("{e3}").contains("429"));
assert!(format!("{e4}").contains("timed out"));
assert!(format!("{e5}").contains("JSON parse"));
}
#[tokio::test]
async fn spawn_scraper_returns_joinhandle_and_aborts_cleanly() {
let mut region_map = HashMap::new();
region_map.insert("eu-west-3".to_string(), "FR".to_string());
let config = ElectricityMapsConfig {
api_endpoint: "http://127.0.0.1:1".to_string(), auth_token: "tok".to_string(),
poll_interval: std::time::Duration::from_hours(1), region_map,
emission_factor_type: EmissionFactorType::default(),
temporal_granularity: TemporalGranularity::default(),
};
let state = Arc::new(ElectricityMapsState::default());
let handle = spawn_electricity_maps_scraper(config, state);
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
handle.abort();
let _ = handle.await;
}
async fn spawn_counting_server(
responses: HashMap<String, String>,
) -> (
String,
Arc<std::sync::atomic::AtomicUsize>,
tokio::task::JoinHandle<()>,
) {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let endpoint = format!("http://{addr}");
let counter = Arc::new(std::sync::atomic::AtomicUsize::new(0));
let counter_clone = counter.clone();
let handle = tokio::spawn(async move {
loop {
let Ok((mut socket, _)) = listener.accept().await else {
return;
};
let counter = counter_clone.clone();
let responses = responses.clone();
tokio::spawn(async move {
let mut buf = [0u8; 4096];
let n = socket.read(&mut buf).await.unwrap_or(0);
counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let req = std::str::from_utf8(&buf[..n]).unwrap_or("");
let zone = req
.lines()
.next()
.and_then(|line| line.split("zone=").nth(1))
.and_then(|tail| tail.split_whitespace().next())
.unwrap_or("");
let resp = match responses.get(zone) {
Some(body) => format!(
"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\n\
Content-Length: {}\r\nConnection: close\r\n\r\n{body}",
body.len()
),
None => "HTTP/1.1 503 Service Unavailable\r\n\
Content-Length: 0\r\nConnection: close\r\n\r\n"
.to_string(),
};
let _ = socket.write_all(resp.as_bytes()).await;
let _ = socket.shutdown().await;
});
}
});
(endpoint, counter, handle)
}
#[tokio::test]
async fn run_scraper_loop_dedups_zones_when_cloud_regions_share_zone() {
let mut responses = HashMap::new();
responses.insert(
"FR".to_string(),
r#"{"zone":"FR","carbonIntensity":56.0}"#.to_string(),
);
responses.insert(
"DE".to_string(),
r#"{"zone":"DE","carbonIntensity":380.0}"#.to_string(),
);
let (endpoint, counter, server_handle) = spawn_counting_server(responses).await;
let mut region_map = HashMap::new();
region_map.insert("aws:eu-west-3".to_string(), "FR".to_string());
region_map.insert("local-k3d".to_string(), "FR".to_string());
region_map.insert("aws:eu-central-1".to_string(), "DE".to_string());
let config = ElectricityMapsConfig {
api_endpoint: endpoint,
auth_token: "tok".to_string(),
poll_interval: std::time::Duration::from_mins(1),
region_map,
emission_factor_type: EmissionFactorType::default(),
temporal_granularity: TemporalGranularity::default(),
};
let state = ElectricityMapsState::new();
let scraper_handle = spawn_electricity_maps_scraper(config, state.clone());
tokio::time::sleep(std::time::Duration::from_millis(120)).await;
scraper_handle.abort();
server_handle.abort();
let count = counter.load(std::sync::atomic::Ordering::SeqCst);
assert_eq!(
count, 2,
"expected 2 API calls (one per unique zone), got {count}"
);
let snap = state.snapshot(monotonic_ms() + 1_000_000, u64::MAX);
assert!((snap["aws:eu-west-3"] - 56.0).abs() < 1e-10);
assert!((snap["local-k3d"] - 56.0).abs() < 1e-10);
assert!((snap["aws:eu-central-1"] - 380.0).abs() < 1e-10);
}
#[tokio::test]
async fn run_scraper_loop_propagates_estimation_metadata_into_state() {
let mut responses = HashMap::new();
responses.insert(
"FR".to_string(),
r#"{"zone":"FR","carbonIntensity":56.0,"isEstimated":true,"estimationMethod":"TIME_SLICER_AVERAGE"}"#
.to_string(),
);
let (endpoint, _counter, server_handle) = spawn_counting_server(responses).await;
let mut region_map = HashMap::new();
region_map.insert("aws:eu-west-3".to_string(), "FR".to_string());
region_map.insert("local-k3d".to_string(), "FR".to_string());
let config = ElectricityMapsConfig {
api_endpoint: endpoint,
auth_token: "tok".to_string(),
poll_interval: std::time::Duration::from_mins(1),
region_map,
emission_factor_type: EmissionFactorType::default(),
temporal_granularity: TemporalGranularity::default(),
};
let state = ElectricityMapsState::new();
let scraper_handle = spawn_electricity_maps_scraper(config, state.clone());
tokio::time::sleep(std::time::Duration::from_millis(120)).await;
scraper_handle.abort();
server_handle.abort();
let snap = state.snapshot_with_metadata(monotonic_ms() + 1_000_000, u64::MAX);
for region in ["aws:eu-west-3", "local-k3d"] {
let entry = snap.get(region).expect("region present");
assert_eq!(entry.is_estimated, Some(true));
assert_eq!(
entry.estimation_method.as_deref(),
Some("TIME_SLICER_AVERAGE")
);
}
}
#[tokio::test]
async fn run_scraper_loop_publishes_state_when_some_zones_succeed_and_others_fail() {
let mut responses = HashMap::new();
responses.insert(
"FR".to_string(),
r#"{"zone":"FR","carbonIntensity":56.0}"#.to_string(),
);
let (endpoint, _counter, server_handle) = spawn_counting_server(responses).await;
let mut region_map = HashMap::new();
region_map.insert("aws:eu-west-3".to_string(), "FR".to_string());
region_map.insert("aws:eu-central-1".to_string(), "DE".to_string());
let state = ElectricityMapsState::new();
state.insert_for_test("aws:eu-central-1".into(), 999.0, 1);
let config = ElectricityMapsConfig {
api_endpoint: endpoint,
auth_token: "tok".to_string(),
poll_interval: std::time::Duration::from_mins(1),
region_map,
emission_factor_type: EmissionFactorType::default(),
temporal_granularity: TemporalGranularity::default(),
};
let scraper_handle = spawn_electricity_maps_scraper(config, state.clone());
tokio::time::sleep(std::time::Duration::from_millis(120)).await;
scraper_handle.abort();
server_handle.abort();
let snap = state.snapshot(monotonic_ms() + 1_000_000, u64::MAX);
let fr = snap
.get("aws:eu-west-3")
.copied()
.expect("FR cloud_region must be present after a partial-success publish");
assert!(
(fr - 56.0).abs() < 1e-10,
"FR cloud_region must carry the fresh 56.0 reading, got {fr}"
);
let de = snap
.get("aws:eu-central-1")
.copied()
.expect("DE cloud_region must keep its pre-seeded stale value after a 503");
assert!(
(de - 999.0).abs() < 1e-10,
"DE cloud_region must preserve the stale 999.0 reading after the 503, got {de}"
);
}
}