crabka-bench-driver 0.3.6

Load driver + report aggregator for the Crabka vs Strimzi benchmark harness
Documentation
//! Prometheus query client. The driver issues a small set of instant
//! queries at scenario end to capture resource usage on the broker pods
//! and (Strimzi only) JVM heap / non-heap from the JMX exporter.

use std::time::Duration;

use anyhow::{Context, Result, anyhow};
use serde::Deserialize;

use crate::scenario::{Resource, Stack};

pub struct PromClient {
    base_url: String,
    http: reqwest::Client,
}

impl PromClient {
    /// `base_url` is the Prometheus root, e.g. `http://prom.monitoring.svc:9090`.
    /// No trailing slash required.
    pub fn new(base_url: impl Into<String>) -> Result<Self> {
        let http = reqwest::Client::builder()
            .timeout(Duration::from_secs(15))
            .build()
            .context("build reqwest client")?;
        Ok(Self {
            base_url: base_url.into().trim_end_matches('/').to_string(),
            http,
        })
    }

    /// Execute a `PromQL` instant query. Returns the first scalar value
    /// across all returned series, summed. Returns `None` when the result
    /// vector is empty.
    pub async fn query_scalar_sum(&self, query: &str) -> Result<Option<f64>> {
        let url = format!("{}/api/v1/query", self.base_url);
        let body: PromResp = self
            .http
            .get(&url)
            .query(&[("query", query)])
            .send()
            .await
            .with_context(|| format!("GET {url} query={query}"))?
            .error_for_status()
            .with_context(|| "prometheus non-2xx")?
            .json()
            .await
            .context("decode prometheus json")?;

        if body.status != "success" {
            return Err(anyhow!(
                "prometheus query failed: status={} err={:?}",
                body.status,
                body.error
            ));
        }
        let Some(data) = body.data else {
            return Ok(None);
        };
        if data.result.is_empty() {
            return Ok(None);
        }
        let mut sum = 0.0_f64;
        let mut had = false;
        for r in &data.result {
            if let Some((_, v)) = r.value.as_ref()
                && let Ok(parsed) = v.parse::<f64>()
            {
                sum += parsed;
                had = true;
            }
        }
        Ok(had.then_some(sum))
    }

    /// Capture broker resource usage for the given stack over a window
    /// ending now. `window_s` should be slightly larger than the measured
    /// scenario duration so the `rate()` window doesn't tail off.
    pub async fn capture_resource(
        &self,
        stack: Stack,
        namespace: &str,
        window_s: u64,
        msgs_produced: u64,
    ) -> Result<Resource> {
        let pod_re = stack.broker_pod_regex();
        let win = window_s.max(15); // PromQL needs at least one full scrape

        let cpu_query = format!(
            "sum(rate(container_cpu_usage_seconds_total{{namespace=\"{namespace}\",pod=~\"{pod_re}\",container!=\"\"}}[{win}s]) * {win})"
        );
        let rss_query = format!(
            "max_over_time(sum(container_memory_working_set_bytes{{namespace=\"{namespace}\",pod=~\"{pod_re}\",container!=\"\"}})[{win}s:15s])"
        );

        let broker_cpu_seconds = self.query_scalar_sum(&cpu_query).await?.unwrap_or(0.0);
        let mem_working = self.query_scalar_sum(&rss_query).await?.unwrap_or(0.0) as u64;

        let mut res = Resource {
            broker_cpu_seconds,
            mem_cgroup_working_set_bytes: mem_working,
            jvm_heap_used_bytes: None,
            jvm_nonheap_used_bytes: None,
            kafka_page_cache_approx_bytes: None,
            msgs_per_cpu_core: if broker_cpu_seconds > 0.0 {
                msgs_produced as f64 / broker_cpu_seconds
            } else {
                0.0
            },
        };

        if matches!(stack, Stack::Kafka) {
            // JMX exporter publishes `jvm_memory_bytes_used{area="..."}`.
            let heap_q = format!(
                "max_over_time(sum(jvm_memory_bytes_used{{namespace=\"{namespace}\",pod=~\"{pod_re}\",area=\"heap\"}})[{win}s:15s])"
            );
            let nonheap_q = format!(
                "max_over_time(sum(jvm_memory_bytes_used{{namespace=\"{namespace}\",pod=~\"{pod_re}\",area=\"nonheap\"}})[{win}s:15s])"
            );
            let heap = self.query_scalar_sum(&heap_q).await?.unwrap_or(0.0) as u64;
            let nonheap = self.query_scalar_sum(&nonheap_q).await?.unwrap_or(0.0) as u64;
            res.jvm_heap_used_bytes = Some(heap);
            res.jvm_nonheap_used_bytes = Some(nonheap);
            res.kafka_page_cache_approx_bytes =
                Some(mem_working as i64 - heap as i64 - nonheap as i64);
        }

        Ok(res)
    }
}

// ── Prometheus HTTP API types (minimal) ─────────────────────────────────────

#[derive(Debug, Deserialize)]
struct PromResp {
    status: String,
    #[serde(default)]
    error: Option<String>,
    #[serde(default)]
    data: Option<PromData>,
}

#[derive(Debug, Deserialize)]
struct PromData {
    #[serde(default)]
    result: Vec<PromResult>,
}

#[derive(Debug, Deserialize)]
struct PromResult {
    /// `[<unix-ts>, "<string-value>"]` — Prometheus encodes the scalar
    /// portion as a JSON string. The timestamp is f64 in seconds.
    #[serde(default)]
    value: Option<(f64, String)>,
}

#[cfg(test)]
mod tests {
    use super::*;
    use assert2::assert;

    #[test]
    fn parses_success_with_one_result() {
        let json = r#"{
          "status": "success",
          "data": {
            "resultType": "vector",
            "result": [
              { "metric": {}, "value": [1234.0, "12.5"] }
            ]
          }
        }"#;
        let p: PromResp = serde_json::from_str(json).unwrap();
        assert!(p.status == "success");
        assert!(p.data.unwrap().result.len() == 1);
    }

    #[test]
    fn parses_empty_result_set() {
        let json = r#"{"status":"success","data":{"resultType":"vector","result":[]}}"#;
        let p: PromResp = serde_json::from_str(json).unwrap();
        assert!(p.data.unwrap().result.is_empty());
    }

    #[test]
    fn parses_error_response() {
        let json = r#"{"status":"error","error":"bad query"}"#;
        let p: PromResp = serde_json::from_str(json).unwrap();
        assert!(p.status == "error");
        assert!(p.error.as_deref() == Some("bad query"));
    }
}