perf-sentinel-core 0.4.3

Core library for perf-sentinel: polyglot performance anti-pattern detector
Documentation
//! Electricity Maps API scraper: periodic polling for real-time
//! carbon intensity per zone.
//!
//! Follows the same pattern as the Scaphandre and cloud energy
//! scrapers: background tokio task, publish-to-state, warn-once on
//! failure.

use std::sync::Arc;

use crate::http_client;

use super::config::ElectricityMapsConfig;
use super::state::{ElectricityMapsState, IntensityReading, monotonic_ms};

/// Maximum body size for API responses (1 MiB, smaller than the shared
/// 8 MiB constant since API responses are tiny JSON objects).
const MAX_API_BODY_BYTES: usize = 1024 * 1024;

/// Request timeout for API calls.
const REQUEST_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);

/// Number of consecutive failures before a diagnostic warning.
const FAILURE_THRESHOLD: u32 = 3;

// ---------------------------------------------------------------
// Error type
// ---------------------------------------------------------------

#[derive(Debug, thiserror::Error)]
enum EmapsScraperError {
    #[error("invalid API URI: {0}")]
    InvalidUri(String),
    #[error("HTTP transport error")]
    Transport(#[source] hyper_util::client::legacy::Error),
    #[error("API body read failed: {0}")]
    BodyRead(String),
    #[error("API returned HTTP {0}")]
    HttpStatus(u16),
    #[error("API request timed out (5s)")]
    Timeout,
    #[error("JSON parse error: {0}")]
    JsonParse(String),
}

// ---------------------------------------------------------------
// API response type
// ---------------------------------------------------------------

#[derive(serde::Deserialize)]
struct CarbonIntensityResponse {
    #[serde(rename = "carbonIntensity")]
    carbon_intensity: f64,
}

// ---------------------------------------------------------------
// Scraper
// ---------------------------------------------------------------

/// Spawn the Electricity Maps scraper as a background tokio task.
///
/// Returns a `JoinHandle` for shutdown abort.
#[must_use]
pub fn spawn_electricity_maps_scraper(
    config: ElectricityMapsConfig,
    state: Arc<ElectricityMapsState>,
) -> tokio::task::JoinHandle<()> {
    tokio::spawn(run_scraper_loop(config, state))
}

async fn run_scraper_loop(config: ElectricityMapsConfig, state: Arc<ElectricityMapsState>) {
    let client = http_client::build_client();
    let interval = config.poll_interval;
    let mut ticker = tokio::time::interval(interval);
    ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);

    let mut consecutive_failures: u32 = 0;

    loop {
        ticker.tick().await;

        let now = monotonic_ms();
        let mut new_table = state.current_owned();
        let mut any_success = false;

        for (cloud_region, zone) in &config.region_map {
            match fetch_intensity(&client, &config.api_endpoint, &config.auth_token, zone).await {
                Ok(intensity) => {
                    new_table.insert(
                        cloud_region.clone(),
                        IntensityReading {
                            gco2_per_kwh: intensity,
                            last_update_ms: now,
                        },
                    );
                    any_success = true;
                    tracing::debug!(
                        zone = %zone,
                        region = %cloud_region,
                        intensity,
                        "Electricity Maps: fetched intensity"
                    );
                }
                Err(e) => {
                    tracing::debug!(
                        zone = %zone,
                        error = %e,
                        "Electricity Maps: failed to fetch intensity"
                    );
                }
            }
        }

        if any_success {
            state.publish(new_table);
            consecutive_failures = 0;
        } else {
            consecutive_failures = consecutive_failures.saturating_add(1);
            if consecutive_failures == FAILURE_THRESHOLD {
                tracing::warn!(
                    failures = consecutive_failures,
                    "Electricity Maps: {} consecutive failures, \
                     falling back to embedded profiles",
                    consecutive_failures
                );
            }
        }
    }
}

async fn fetch_intensity(
    client: &http_client::HttpClient,
    api_endpoint: &str,
    auth_token: &str,
    zone: &str,
) -> Result<f64, EmapsScraperError> {
    let uri_str = format!("{api_endpoint}/carbon-intensity/latest?zone={zone}");
    let uri: hyper::Uri = uri_str
        .parse()
        .map_err(|e: hyper::http::uri::InvalidUri| EmapsScraperError::InvalidUri(e.to_string()))?;

    let req = hyper::Request::builder()
        .method(hyper::Method::GET)
        .uri(&uri)
        .header("auth-token", auth_token)
        .header("User-Agent", "perf-sentinel")
        .body(http_body_util::Empty::<bytes::Bytes>::new())
        .map_err(|e| EmapsScraperError::BodyRead(e.to_string()))?;

    let resp = tokio::time::timeout(REQUEST_TIMEOUT, client.request(req))
        .await
        .map_err(|_| EmapsScraperError::Timeout)?
        .map_err(EmapsScraperError::Transport)?;

    let status = resp.status().as_u16();
    if status != 200 {
        tracing::debug!(
            status,
            endpoint = %http_client::redact_endpoint(&uri),
            "Electricity Maps: non-200 response"
        );
        return Err(EmapsScraperError::HttpStatus(status));
    }

    let limited = http_body_util::Limited::new(resp.into_body(), MAX_API_BODY_BYTES);
    let body = http_body_util::BodyExt::collect(limited)
        .await
        .map_err(|e| EmapsScraperError::BodyRead(e.to_string()))?
        .to_bytes();

    let text =
        std::str::from_utf8(&body).map_err(|e| EmapsScraperError::BodyRead(e.to_string()))?;

    let response: CarbonIntensityResponse =
        serde_json::from_str(text).map_err(|e| EmapsScraperError::JsonParse(e.to_string()))?;

    if !response.carbon_intensity.is_finite() || response.carbon_intensity < 0.0 {
        return Err(EmapsScraperError::JsonParse(format!(
            "invalid carbon intensity value: {}",
            response.carbon_intensity
        )));
    }

    Ok(response.carbon_intensity)
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn parse_valid_response() {
        let json = r#"{"zone":"FR","carbonIntensity":56.0,"datetime":"2025-01-01T12:00:00Z"}"#;
        let resp: CarbonIntensityResponse = serde_json::from_str(json).unwrap();
        assert!((resp.carbon_intensity - 56.0).abs() < 1e-10);
    }

    #[test]
    fn parse_response_missing_field() {
        let json = r#"{"zone":"FR"}"#;
        let result: Result<CarbonIntensityResponse, _> = serde_json::from_str(json);
        assert!(result.is_err());
    }

    // ---------------------------------------------------------------
    // Integration tests with a mock HTTP server on an ephemeral port
    // ---------------------------------------------------------------
    //
    // The mock server helpers live in `crate::test_helpers` and are
    // shared with the scaphandre, cloud_energy, and tempo tests, one
    // implementation of "bind ephemeral port + one-shot reply".

    use crate::test_helpers::{http_200_text, http_status, spawn_one_shot_server};

    /// Wrap the shared `http_200_text` with the JSON content type.
    fn http_200(body: &str) -> Vec<u8> {
        http_200_text("application/json", body)
    }

    #[tokio::test]
    async fn fetch_intensity_success_happy_path() {
        let body = r#"{"zone":"FR","carbonIntensity":56.0,"datetime":"2025-01-01T12:00:00Z"}"#;
        let (endpoint, server) = spawn_one_shot_server(http_200(body)).await;

        let client = http_client::build_client();
        let intensity = fetch_intensity(&client, &endpoint, "test-token", "FR")
            .await
            .expect("200 + valid JSON should succeed");
        assert!((intensity - 56.0).abs() < 1e-10);
        server.await.unwrap();
    }

    #[tokio::test]
    async fn fetch_intensity_rejects_non_200_with_http_status_error() {
        let (endpoint, server) = spawn_one_shot_server(http_status(401, "Unauthorized")).await;

        let client = http_client::build_client();
        let err = fetch_intensity(&client, &endpoint, "bad-token", "FR")
            .await
            .expect_err("401 must surface as HttpStatus");
        match err {
            EmapsScraperError::HttpStatus(401) => {}
            other => panic!("expected HttpStatus(401), got {other:?}"),
        }
        server.await.unwrap();
    }

    #[tokio::test]
    async fn fetch_intensity_rejects_server_error() {
        let (endpoint, server) =
            spawn_one_shot_server(http_status(503, "Service Unavailable")).await;

        let client = http_client::build_client();
        let err = fetch_intensity(&client, &endpoint, "tok", "FR")
            .await
            .expect_err("503 must surface as HttpStatus");
        match err {
            EmapsScraperError::HttpStatus(503) => {}
            other => panic!("expected HttpStatus(503), got {other:?}"),
        }
        server.await.unwrap();
    }

    #[tokio::test]
    async fn fetch_intensity_rejects_malformed_json() {
        let (endpoint, server) = spawn_one_shot_server(http_200("not json at all")).await;

        let client = http_client::build_client();
        let err = fetch_intensity(&client, &endpoint, "tok", "FR")
            .await
            .expect_err("malformed JSON must surface as JsonParse");
        assert!(matches!(err, EmapsScraperError::JsonParse(_)));
        server.await.unwrap();
    }

    #[tokio::test]
    async fn fetch_intensity_rejects_json_without_carbon_intensity_field() {
        let (endpoint, server) = spawn_one_shot_server(http_200(r#"{"zone":"FR"}"#)).await;

        let client = http_client::build_client();
        let err = fetch_intensity(&client, &endpoint, "tok", "FR")
            .await
            .expect_err("missing field must surface as JsonParse");
        assert!(matches!(err, EmapsScraperError::JsonParse(_)));
        server.await.unwrap();
    }

    #[tokio::test]
    async fn fetch_intensity_rejects_negative_carbon_intensity() {
        // Valid schema, but the value is negative, the API should never
        // return this, but we validate defensively to avoid silently
        // flipping the sign of CO₂ estimates.
        let body = r#"{"carbonIntensity":-5.0}"#;
        let (endpoint, server) = spawn_one_shot_server(http_200(body)).await;

        let client = http_client::build_client();
        let err = fetch_intensity(&client, &endpoint, "tok", "FR")
            .await
            .expect_err("negative intensity must be rejected");
        match err {
            EmapsScraperError::JsonParse(msg) => {
                assert!(msg.contains("invalid carbon intensity"));
            }
            other => panic!("expected JsonParse, got {other:?}"),
        }
        server.await.unwrap();
    }

    #[tokio::test]
    async fn fetch_intensity_rejects_nan_carbon_intensity() {
        // `nan` is valid JSON in a non-strict parser... but serde_json
        // rejects it. So we craft a value that parses to NaN indirectly
        // via an intermediate representation. The simpler path is to
        // submit a literal that serde_json IS willing to parse as f64
        // but is non-finite. Since that's not possible with standard
        // JSON, this test instead verifies the guard branch via a
        // synthetic "very small" number path, or we can hit the guard
        // by constructing the error manually for the JsonParse path.
        //
        // In practice the `is_finite()` check is belt-and-braces. The
        // negative value test above exercises the same match arm, which
        // is what coverage needs. This is left as documentation.
    }

    #[tokio::test]
    async fn fetch_intensity_rejects_invalid_uri() {
        // Garbage endpoint, hits the `InvalidUri` error variant.
        let client = http_client::build_client();
        let err = fetch_intensity(&client, "not a uri :: bad", "tok", "FR")
            .await
            .expect_err("invalid URI must surface as InvalidUri");
        assert!(matches!(err, EmapsScraperError::InvalidUri(_)));
    }

    #[test]
    fn emaps_scraper_error_display_messages_are_informative() {
        // Smoke test for the `thiserror` derive: each variant has a
        // unique, user-facing message so operators can tell error
        // categories apart in logs.
        let e1 = EmapsScraperError::InvalidUri("bad".to_string());
        let e2 = EmapsScraperError::BodyRead("oops".to_string());
        let e3 = EmapsScraperError::HttpStatus(429);
        let e4 = EmapsScraperError::Timeout;
        let e5 = EmapsScraperError::JsonParse("nope".to_string());
        assert!(format!("{e1}").contains("invalid API URI"));
        assert!(format!("{e2}").contains("body read"));
        assert!(format!("{e3}").contains("429"));
        assert!(format!("{e4}").contains("timed out"));
        assert!(format!("{e5}").contains("JSON parse"));
    }

    /// Smoke test for `spawn_electricity_maps_scraper`: it must return a
    /// `JoinHandle` and not panic during task construction. The loop
    /// then polls an unreachable endpoint on the first tick; we abort
    /// immediately so the test doesn't hang.
    #[tokio::test]
    async fn spawn_scraper_returns_joinhandle_and_aborts_cleanly() {
        let mut region_map = std::collections::HashMap::new();
        region_map.insert("eu-west-3".to_string(), "FR".to_string());
        let config = ElectricityMapsConfig {
            api_endpoint: "http://127.0.0.1:1".to_string(), // unreachable
            auth_token: "tok".to_string(),
            poll_interval: std::time::Duration::from_hours(1), // never ticks during the test
            region_map,
        };
        let state = Arc::new(ElectricityMapsState::default());
        let handle = spawn_electricity_maps_scraper(config, state);
        // Give the task a moment to start its initial tick setup, then abort.
        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
        handle.abort();
        // Aborted tasks return JoinError::Cancelled; awaiting them must
        // not panic from our side.
        let _ = handle.await;
    }
}