use std::time::Duration;
use anyhow::{Context, Result, anyhow};
use serde::Deserialize;
use crate::scenario::{Resource, Stack};
pub struct PromClient {
base_url: String,
http: reqwest::Client,
}
impl PromClient {
pub fn new(base_url: impl Into<String>) -> Result<Self> {
let http = reqwest::Client::builder()
.timeout(Duration::from_secs(15))
.build()
.context("build reqwest client")?;
Ok(Self {
base_url: base_url.into().trim_end_matches('/').to_string(),
http,
})
}
pub async fn query_scalar_sum(&self, query: &str) -> Result<Option<f64>> {
let url = format!("{}/api/v1/query", self.base_url);
let body: PromResp = self
.http
.get(&url)
.query(&[("query", query)])
.send()
.await
.with_context(|| format!("GET {url} query={query}"))?
.error_for_status()
.with_context(|| "prometheus non-2xx")?
.json()
.await
.context("decode prometheus json")?;
if body.status != "success" {
return Err(anyhow!(
"prometheus query failed: status={} err={:?}",
body.status,
body.error
));
}
let Some(data) = body.data else {
return Ok(None);
};
if data.result.is_empty() {
return Ok(None);
}
let mut sum = 0.0_f64;
let mut had = false;
for r in &data.result {
if let Some((_, v)) = r.value.as_ref()
&& let Ok(parsed) = v.parse::<f64>()
{
sum += parsed;
had = true;
}
}
Ok(had.then_some(sum))
}
pub async fn capture_resource(
&self,
stack: Stack,
namespace: &str,
window_s: u64,
msgs_produced: u64,
) -> Result<Resource> {
let pod_re = stack.broker_pod_regex();
let win = window_s.max(15);
let cpu_query = format!(
"sum(rate(container_cpu_usage_seconds_total{{namespace=\"{namespace}\",pod=~\"{pod_re}\",container!=\"\"}}[{win}s]) * {win})"
);
let rss_query = format!(
"max_over_time(sum(container_memory_working_set_bytes{{namespace=\"{namespace}\",pod=~\"{pod_re}\",container!=\"\"}})[{win}s:15s])"
);
let broker_cpu_seconds = self.query_scalar_sum(&cpu_query).await?.unwrap_or(0.0);
let mem_working = self.query_scalar_sum(&rss_query).await?.unwrap_or(0.0) as u64;
let mut res = Resource {
broker_cpu_seconds,
mem_cgroup_working_set_bytes: mem_working,
jvm_heap_used_bytes: None,
jvm_nonheap_used_bytes: None,
kafka_page_cache_approx_bytes: None,
msgs_per_cpu_core: if broker_cpu_seconds > 0.0 {
msgs_produced as f64 / broker_cpu_seconds
} else {
0.0
},
};
if matches!(stack, Stack::Kafka) {
let heap_q = format!(
"max_over_time(sum(jvm_memory_bytes_used{{namespace=\"{namespace}\",pod=~\"{pod_re}\",area=\"heap\"}})[{win}s:15s])"
);
let nonheap_q = format!(
"max_over_time(sum(jvm_memory_bytes_used{{namespace=\"{namespace}\",pod=~\"{pod_re}\",area=\"nonheap\"}})[{win}s:15s])"
);
let heap = self.query_scalar_sum(&heap_q).await?.unwrap_or(0.0) as u64;
let nonheap = self.query_scalar_sum(&nonheap_q).await?.unwrap_or(0.0) as u64;
res.jvm_heap_used_bytes = Some(heap);
res.jvm_nonheap_used_bytes = Some(nonheap);
res.kafka_page_cache_approx_bytes =
Some(mem_working as i64 - heap as i64 - nonheap as i64);
}
Ok(res)
}
}
#[derive(Debug, Deserialize)]
struct PromResp {
status: String,
#[serde(default)]
error: Option<String>,
#[serde(default)]
data: Option<PromData>,
}
#[derive(Debug, Deserialize)]
struct PromData {
#[serde(default)]
result: Vec<PromResult>,
}
#[derive(Debug, Deserialize)]
struct PromResult {
#[serde(default)]
value: Option<(f64, String)>,
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
#[test]
fn parses_success_with_one_result() {
let json = r#"{
"status": "success",
"data": {
"resultType": "vector",
"result": [
{ "metric": {}, "value": [1234.0, "12.5"] }
]
}
}"#;
let p: PromResp = serde_json::from_str(json).unwrap();
assert!(p.status == "success");
assert!(p.data.unwrap().result.len() == 1);
}
#[test]
fn parses_empty_result_set() {
let json = r#"{"status":"success","data":{"resultType":"vector","result":[]}}"#;
let p: PromResp = serde_json::from_str(json).unwrap();
assert!(p.data.unwrap().result.is_empty());
}
#[test]
fn parses_error_response() {
let json = r#"{"status":"error","error":"bad query"}"#;
let p: PromResp = serde_json::from_str(json).unwrap();
assert!(p.status == "error");
assert!(p.error.as_deref() == Some("bad query"));
}
}