use crate::detect::Finding;
use crate::normalize::sql::normalize_sql;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct PgStatEntry {
pub query: String,
pub normalized_template: String,
pub calls: u64,
pub total_exec_time_ms: f64,
pub mean_exec_time_ms: f64,
pub rows: u64,
pub shared_blks_hit: u64,
pub shared_blks_read: u64,
#[serde(default)]
pub seen_in_traces: bool,
}
#[derive(Debug, Clone, PartialEq, Serialize)]
pub struct PgStatRanking {
pub label: String,
pub entries: Vec<PgStatEntry>,
}
#[derive(Debug, Clone, PartialEq, Serialize)]
pub struct PgStatReport {
pub total_entries: usize,
pub top_n: usize,
pub rankings: Vec<PgStatRanking>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PgStatFormat {
Csv,
Json,
}
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum PgStatError {
#[error("payload too large: {size} bytes exceeds maximum of {max} bytes")]
PayloadTooLarge { size: usize, max: usize },
#[error("CSV parse error at line {line}: {detail}")]
CsvParse { line: usize, detail: String },
#[error("JSON parse error: {0}")]
JsonParse(String),
#[error("missing required column: {0}")]
MissingColumn(String),
#[error("empty input")]
EmptyInput,
#[cfg(any(feature = "daemon", feature = "tempo"))]
#[error("Prometheus request failed: {0}")]
PrometheusRequest(String),
#[cfg(any(feature = "daemon", feature = "tempo"))]
#[error("Prometheus response parse error: {0}")]
PrometheusFormat(String),
}
#[derive(Deserialize)]
struct RawJsonEntry {
query: String,
calls: u64,
#[serde(alias = "total_exec_time")]
total_exec_time_ms: f64,
#[serde(alias = "mean_exec_time")]
mean_exec_time_ms: f64,
#[serde(default)]
rows: u64,
#[serde(default)]
shared_blks_hit: u64,
#[serde(default)]
shared_blks_read: u64,
}
#[must_use]
pub fn detect_pg_stat_format(raw: &[u8]) -> PgStatFormat {
let trimmed = raw.iter().position(|&b| !b.is_ascii_whitespace());
match trimmed.map(|i| raw[i]) {
Some(b'[' | b'{') => PgStatFormat::Json,
_ => PgStatFormat::Csv,
}
}
pub fn parse_pg_stat(raw: &[u8], max_size: usize) -> Result<Vec<PgStatEntry>, PgStatError> {
if raw.len() > max_size {
return Err(PgStatError::PayloadTooLarge {
size: raw.len(),
max: max_size,
});
}
if raw.is_empty() || raw.iter().all(|&b| b.is_ascii_whitespace()) {
return Err(PgStatError::EmptyInput);
}
let text = std::str::from_utf8(raw).map_err(|e| PgStatError::CsvParse {
line: 0,
detail: format!("invalid UTF-8: {e}"),
})?;
match detect_pg_stat_format(raw) {
PgStatFormat::Csv => parse_csv(text),
PgStatFormat::Json => parse_json(text),
}
}
#[must_use]
pub fn rank_pg_stat(entries: &[PgStatEntry], top_n: usize) -> PgStatReport {
let total_entries = entries.len();
let top_n_by =
|cmp: fn(&PgStatEntry, &PgStatEntry) -> std::cmp::Ordering, label: &str| -> PgStatRanking {
let mut indices: Vec<usize> = (0..entries.len()).collect();
indices.sort_by(|&a, &b| cmp(&entries[a], &entries[b]));
indices.truncate(top_n);
PgStatRanking {
label: label.to_string(),
entries: indices.iter().map(|&i| entries[i].clone()).collect(),
}
};
let by_total_time = top_n_by(
|a, b| {
b.total_exec_time_ms
.partial_cmp(&a.total_exec_time_ms)
.unwrap_or(std::cmp::Ordering::Equal)
},
"top by total_exec_time",
);
let by_calls = top_n_by(|a, b| b.calls.cmp(&a.calls), "top by calls");
let by_mean_time = top_n_by(
|a, b| {
b.mean_exec_time_ms
.partial_cmp(&a.mean_exec_time_ms)
.unwrap_or(std::cmp::Ordering::Equal)
},
"top by mean_exec_time",
);
PgStatReport {
total_entries,
top_n,
rankings: vec![by_total_time, by_calls, by_mean_time],
}
}
pub fn cross_reference(entries: &mut [PgStatEntry], findings: &[Finding]) {
let templates: std::collections::HashSet<&str> = findings
.iter()
.map(|f| f.pattern.template.as_str())
.collect();
for entry in entries {
if templates.contains(entry.normalized_template.as_str()) {
entry.seen_in_traces = true;
}
}
}
const MAX_CSV_ROWS: usize = 1_000_000;
fn parse_csv(text: &str) -> Result<Vec<PgStatEntry>, PgStatError> {
let mut lines = text.lines();
let header_line = lines.next().ok_or(PgStatError::EmptyInput)?;
let headers = parse_csv_row(header_line);
let col = |name: &str| -> Result<usize, PgStatError> {
headers
.iter()
.position(|h| h.eq_ignore_ascii_case(name))
.ok_or_else(|| PgStatError::MissingColumn(name.to_string()))
};
let query_idx = col("query")?;
let calls_idx = col("calls")?;
let total_time_idx = col("total_exec_time")?;
let mean_time_idx = col("mean_exec_time")?;
let rows_idx = col("rows").ok();
let hit_idx = col("shared_blks_hit").ok();
let read_idx = col("shared_blks_read").ok();
let estimated = (text.len() / 100).min(100_000);
let mut entries = Vec::with_capacity(estimated);
for (line_num, line) in lines.enumerate() {
if entries.len() >= MAX_CSV_ROWS {
return Err(PgStatError::CsvParse {
line: line_num + 2,
detail: format!("CSV exceeds maximum of {MAX_CSV_ROWS} rows"),
});
}
let line = line.trim();
if line.is_empty() {
continue;
}
let fields = parse_csv_row(line);
let line_num = line_num + 2;
let query = fields.get(query_idx).cloned().unwrap_or_default();
let calls = parse_u64(&fields, calls_idx, line_num, "calls")?;
let total_exec_time_ms = parse_f64(&fields, total_time_idx, line_num, "total_exec_time")?;
let mean_exec_time_ms = parse_f64(&fields, mean_time_idx, line_num, "mean_exec_time")?;
let rows = rows_idx.map_or(Ok(0), |i| parse_u64(&fields, i, line_num, "rows"))?;
let shared_blks_hit = hit_idx.map_or(Ok(0), |i| {
parse_u64(&fields, i, line_num, "shared_blks_hit")
})?;
let shared_blks_read = read_idx.map_or(Ok(0), |i| {
parse_u64(&fields, i, line_num, "shared_blks_read")
})?;
let normalized = normalize_sql(&query);
entries.push(PgStatEntry {
query,
normalized_template: normalized.template,
calls,
total_exec_time_ms,
mean_exec_time_ms,
rows,
shared_blks_hit,
shared_blks_read,
seen_in_traces: false,
});
}
if entries.is_empty() {
return Err(PgStatError::EmptyInput);
}
Ok(entries)
}
fn parse_csv_row(line: &str) -> Vec<String> {
let mut fields = Vec::with_capacity(8);
let mut current = String::new();
let mut in_quotes = false;
let mut chars = line.chars().peekable();
while let Some(c) = chars.next() {
if in_quotes {
if c == '"' {
if chars.peek() == Some(&'"') {
current.push('"');
chars.next();
} else {
in_quotes = false;
}
} else {
current.push(c);
}
} else if c == '"' {
in_quotes = true;
} else if c == ',' {
fields.push(std::mem::take(&mut current));
} else {
current.push(c);
}
}
fields.push(current);
fields
}
fn parse_u64(
fields: &[String],
idx: usize,
line: usize,
col_name: &str,
) -> Result<u64, PgStatError> {
let val = fields.get(idx).map_or("", String::as_str).trim();
val.parse::<u64>().map_err(|_| PgStatError::CsvParse {
line,
detail: format!("cannot parse '{val}' as integer for column {col_name}"),
})
}
fn parse_f64(
fields: &[String],
idx: usize,
line: usize,
col_name: &str,
) -> Result<f64, PgStatError> {
let val = fields.get(idx).map_or("", String::as_str).trim();
val.parse::<f64>().map_err(|_| PgStatError::CsvParse {
line,
detail: format!("cannot parse '{val}' as float for column {col_name}"),
})
}
fn parse_json(text: &str) -> Result<Vec<PgStatEntry>, PgStatError> {
let raw_entries: Vec<RawJsonEntry> =
serde_json::from_str(text).map_err(|e| PgStatError::JsonParse(e.to_string()))?;
if raw_entries.is_empty() {
return Err(PgStatError::EmptyInput);
}
if raw_entries.len() > MAX_CSV_ROWS {
return Err(PgStatError::CsvParse {
line: 0,
detail: format!(
"JSON array exceeds maximum of {MAX_CSV_ROWS} entries (got {})",
raw_entries.len()
),
});
}
let entries = raw_entries
.into_iter()
.map(|raw| {
let normalized = normalize_sql(&raw.query);
PgStatEntry {
query: raw.query,
normalized_template: normalized.template,
calls: raw.calls,
total_exec_time_ms: raw.total_exec_time_ms,
mean_exec_time_ms: raw.mean_exec_time_ms,
rows: raw.rows,
shared_blks_hit: raw.shared_blks_hit,
shared_blks_read: raw.shared_blks_read,
seen_in_traces: false,
}
})
.collect();
Ok(entries)
}
#[cfg(any(feature = "daemon", feature = "tempo"))]
pub async fn fetch_from_prometheus(
endpoint: &str,
top_n: usize,
) -> Result<Vec<PgStatEntry>, PgStatError> {
validate_prometheus_endpoint(endpoint)?;
let client = crate::http_client::build_client();
let query = format!("topk({top_n}%2C%20pg_stat_statements_seconds_total)");
let url = format!("{endpoint}/api/v1/query?query={query}");
let uri: crate::http_client::Uri = url
.parse()
.map_err(|e| PgStatError::PrometheusRequest(format!("invalid URL: {e}")))?;
let timeout = std::time::Duration::from_secs(30);
let body = crate::http_client::fetch_get(&client, &uri, "perf-sentinel/pg-stat", timeout)
.await
.map_err(|e| {
PgStatError::PrometheusRequest(format!(
"{e} (endpoint: {})",
crate::http_client::redact_endpoint(&uri)
))
})?;
parse_prometheus_response(&body)
}
#[cfg(any(feature = "daemon", feature = "tempo"))]
fn validate_prometheus_endpoint(endpoint: &str) -> Result<(), PgStatError> {
let uri: crate::http_client::Uri = endpoint
.parse()
.map_err(|e| PgStatError::PrometheusRequest(format!("invalid endpoint URL: {e}")))?;
match uri.scheme_str() {
Some("http" | "https") => {}
Some(other) => {
return Err(PgStatError::PrometheusRequest(format!(
"unsupported scheme `{other}`, only http and https are accepted"
)));
}
None => {
return Err(PgStatError::PrometheusRequest(
"endpoint URL must include a scheme (http:// or https://)".to_string(),
));
}
}
if let Some(authority) = uri.authority()
&& authority.as_str().contains('@')
{
return Err(PgStatError::PrometheusRequest(
"credentials in the URL are not accepted; use env vars instead".to_string(),
));
}
Ok(())
}
#[cfg(any(feature = "daemon", feature = "tempo"))]
fn parse_prometheus_response(body: &[u8]) -> Result<Vec<PgStatEntry>, PgStatError> {
let json: serde_json::Value = serde_json::from_slice(body)
.map_err(|e| PgStatError::PrometheusFormat(format!("invalid JSON: {e}")))?;
let results = json
.get("data")
.and_then(|d| d.get("result"))
.and_then(|r| r.as_array())
.ok_or_else(|| PgStatError::PrometheusFormat("missing data.result array".to_string()))?;
let mut entries = Vec::with_capacity(results.len());
for result in results {
let metric = result.get("metric").unwrap_or(&serde_json::Value::Null);
let query_text = metric
.get("query")
.or_else(|| metric.get("queryid"))
.and_then(|v| v.as_str())
.unwrap_or("unknown")
.to_string();
let value = result
.get("value")
.and_then(|v| v.as_array())
.and_then(|arr| arr.get(1))
.and_then(|v| v.as_str())
.and_then(|s| s.parse::<f64>().ok())
.unwrap_or(0.0);
let total_exec_time_ms = value * 1000.0;
let calls = metric
.get("calls")
.and_then(serde_json::Value::as_str)
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(0);
#[allow(clippy::cast_precision_loss)]
let mean_exec_time_ms = if calls > 0 {
total_exec_time_ms / (calls as f64)
} else {
total_exec_time_ms
};
let normalized = normalize_sql(&query_text);
entries.push(PgStatEntry {
query: query_text,
normalized_template: normalized.template,
calls,
total_exec_time_ms,
mean_exec_time_ms,
rows: 0,
shared_blks_hit: 0,
shared_blks_read: 0,
seen_in_traces: false,
});
}
Ok(entries)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::detect::{Confidence, FindingType, Pattern, Severity};
fn sample_csv() -> &'static str {
"query,calls,total_exec_time,mean_exec_time,rows,shared_blks_hit,shared_blks_read\n\
SELECT * FROM order_item WHERE order_id = 42,1500,4500.50,3.000,1500,12000,150\n\
\"SELECT * FROM orders WHERE id = 1 AND status = 'active'\",800,2400.00,3.000,800,6400,80\n\
INSERT INTO audit_log VALUES (1),200,600.00,3.000,200,0,200\n\
SELECT count(*) FROM order_item,50,250.00,5.000,50,500,10"
}
fn sample_json() -> &'static str {
r#"[
{
"query": "SELECT * FROM order_item WHERE order_id = 42",
"calls": 1500,
"total_exec_time_ms": 4500.50,
"mean_exec_time_ms": 3.0,
"rows": 1500,
"shared_blks_hit": 12000,
"shared_blks_read": 150
},
{
"query": "SELECT * FROM orders WHERE id = 1 AND status = 'active'",
"calls": 800,
"total_exec_time_ms": 2400.0,
"mean_exec_time_ms": 3.0,
"rows": 800,
"shared_blks_hit": 6400,
"shared_blks_read": 80
},
{
"query": "INSERT INTO audit_log VALUES (1)",
"calls": 200,
"total_exec_time_ms": 600.0,
"mean_exec_time_ms": 3.0,
"rows": 200,
"shared_blks_hit": 0,
"shared_blks_read": 200
},
{
"query": "SELECT count(*) FROM order_item",
"calls": 50,
"total_exec_time_ms": 250.0,
"mean_exec_time_ms": 5.0,
"rows": 50,
"shared_blks_hit": 500,
"shared_blks_read": 10
}
]"#
}
#[test]
fn detect_format_csv() {
assert_eq!(
detect_pg_stat_format(b"query,calls,total_exec_time"),
PgStatFormat::Csv
);
}
#[test]
fn detect_format_json_array() {
assert_eq!(
detect_pg_stat_format(b"[{\"query\": \"SELECT 1\"}]"),
PgStatFormat::Json
);
}
#[test]
fn detect_format_json_with_whitespace() {
assert_eq!(
detect_pg_stat_format(b" \n [{\"query\": \"SELECT 1\"}]"),
PgStatFormat::Json
);
}
#[test]
fn detect_format_empty_defaults_csv() {
assert_eq!(detect_pg_stat_format(b""), PgStatFormat::Csv);
}
#[test]
fn parse_csv_basic() {
let entries = parse_pg_stat(sample_csv().as_bytes(), 1_048_576).unwrap();
assert_eq!(entries.len(), 4);
assert_eq!(entries[0].calls, 1500);
assert!((entries[0].total_exec_time_ms - 4500.50).abs() < f64::EPSILON);
assert_eq!(entries[0].rows, 1500);
assert_eq!(entries[0].shared_blks_hit, 12000);
}
#[test]
fn parse_csv_quoted_field() {
let entries = parse_pg_stat(sample_csv().as_bytes(), 1_048_576).unwrap();
assert!(entries[1].query.contains("status = 'active'"));
}
#[test]
fn parse_csv_normalization_applied() {
let entries = parse_pg_stat(sample_csv().as_bytes(), 1_048_576).unwrap();
assert_eq!(
entries[0].normalized_template,
"SELECT * FROM order_item WHERE order_id = ?"
);
}
#[test]
fn parse_csv_empty_input() {
let result = parse_pg_stat(b"", 1_048_576);
assert!(matches!(result, Err(PgStatError::EmptyInput)));
}
#[test]
fn parse_csv_whitespace_only() {
let result = parse_pg_stat(b" \n \n ", 1_048_576);
assert!(matches!(result, Err(PgStatError::EmptyInput)));
}
#[test]
fn parse_csv_header_only() {
let result = parse_pg_stat(b"query,calls,total_exec_time,mean_exec_time\n", 1_048_576);
assert!(matches!(result, Err(PgStatError::EmptyInput)));
}
#[test]
fn parse_csv_missing_column() {
let result = parse_pg_stat(b"query,calls\nSELECT 1,100", 1_048_576);
assert!(matches!(result, Err(PgStatError::MissingColumn(_))));
}
#[test]
fn parse_csv_oversized_payload() {
let result = parse_pg_stat(sample_csv().as_bytes(), 10);
assert!(matches!(result, Err(PgStatError::PayloadTooLarge { .. })));
}
#[test]
fn parse_csv_escaped_quotes() {
let csv = "query,calls,total_exec_time,mean_exec_time\n\
\"SELECT * FROM t WHERE name = \"\"O'Brien\"\"\",100,500.0,5.0";
let entries = parse_pg_stat(csv.as_bytes(), 1_048_576).unwrap();
assert!(entries[0].query.contains("O'Brien"));
}
#[test]
fn parse_json_basic() {
let entries = parse_pg_stat(sample_json().as_bytes(), 1_048_576).unwrap();
assert_eq!(entries.len(), 4);
assert_eq!(entries[0].calls, 1500);
}
#[test]
fn parse_json_normalization_applied() {
let entries = parse_pg_stat(sample_json().as_bytes(), 1_048_576).unwrap();
assert_eq!(
entries[0].normalized_template,
"SELECT * FROM order_item WHERE order_id = ?"
);
}
#[test]
fn parse_json_empty_array() {
let result = parse_pg_stat(b"[]", 1_048_576);
assert!(matches!(result, Err(PgStatError::EmptyInput)));
}
#[test]
fn parse_json_invalid() {
let result = parse_pg_stat(b"[{invalid json}]", 1_048_576);
assert!(matches!(result, Err(PgStatError::JsonParse(_))));
}
#[test]
fn parse_json_field_alias() {
let json = r#"[{
"query": "SELECT 1",
"calls": 10,
"total_exec_time": 100.0,
"mean_exec_time": 10.0,
"rows": 10
}]"#;
let entries = parse_pg_stat(json.as_bytes(), 1_048_576).unwrap();
assert!((entries[0].total_exec_time_ms - 100.0).abs() < f64::EPSILON);
}
#[test]
fn rank_by_total_time() {
let entries = parse_pg_stat(sample_csv().as_bytes(), 1_048_576).unwrap();
let report = rank_pg_stat(&entries, 2);
assert_eq!(report.total_entries, 4);
assert_eq!(report.top_n, 2);
let by_time = &report.rankings[0];
assert_eq!(by_time.label, "top by total_exec_time");
assert_eq!(by_time.entries.len(), 2);
assert!(by_time.entries[0].total_exec_time_ms >= by_time.entries[1].total_exec_time_ms);
}
#[test]
fn rank_by_calls() {
let entries = parse_pg_stat(sample_csv().as_bytes(), 1_048_576).unwrap();
let report = rank_pg_stat(&entries, 10);
let by_calls = &report.rankings[1];
assert_eq!(by_calls.label, "top by calls");
assert!(by_calls.entries[0].calls >= by_calls.entries[1].calls);
}
#[test]
fn rank_by_mean_time() {
let entries = parse_pg_stat(sample_csv().as_bytes(), 1_048_576).unwrap();
let report = rank_pg_stat(&entries, 10);
let by_mean = &report.rankings[2];
assert_eq!(by_mean.label, "top by mean_exec_time");
assert!(by_mean.entries[0].mean_exec_time_ms >= by_mean.entries[1].mean_exec_time_ms);
}
#[test]
fn rank_top_n_limits_output() {
let entries = parse_pg_stat(sample_csv().as_bytes(), 1_048_576).unwrap();
let report = rank_pg_stat(&entries, 1);
for ranking in &report.rankings {
assert_eq!(ranking.entries.len(), 1);
}
}
#[test]
fn rank_empty_entries() {
let report = rank_pg_stat(&[], 10);
assert_eq!(report.total_entries, 0);
for ranking in &report.rankings {
assert!(ranking.entries.is_empty());
}
}
fn make_finding(template: &str) -> Finding {
Finding {
finding_type: FindingType::NPlusOneSql,
severity: Severity::Warning,
trace_id: "trace-1".to_string(),
service: "order-svc".to_string(),
source_endpoint: "POST /api/orders/42/submit".to_string(),
pattern: Pattern {
template: template.to_string(),
occurrences: 6,
window_ms: 200,
distinct_params: 6,
},
suggestion: "batch".to_string(),
first_timestamp: "2025-07-10T14:32:01.000Z".to_string(),
last_timestamp: "2025-07-10T14:32:01.250Z".to_string(),
green_impact: None,
confidence: Confidence::default(),
code_location: None,
suggested_fix: None,
}
}
#[test]
fn cross_reference_marks_matching_templates() {
let mut entries = parse_pg_stat(sample_csv().as_bytes(), 1_048_576).unwrap();
let findings = vec![make_finding("SELECT * FROM order_item WHERE order_id = ?")];
cross_reference(&mut entries, &findings);
assert!(entries[0].seen_in_traces);
assert!(!entries[1].seen_in_traces);
}
#[test]
fn cross_reference_no_matches() {
let mut entries = parse_pg_stat(sample_csv().as_bytes(), 1_048_576).unwrap();
let findings = vec![make_finding("SELECT * FROM nonexistent WHERE id = ?")];
cross_reference(&mut entries, &findings);
assert!(entries.iter().all(|e| !e.seen_in_traces));
}
#[test]
fn cross_reference_empty_findings() {
let mut entries = parse_pg_stat(sample_csv().as_bytes(), 1_048_576).unwrap();
cross_reference(&mut entries, &[]);
assert!(entries.iter().all(|e| !e.seen_in_traces));
}
#[test]
fn csv_row_with_embedded_comma() {
let row = r#""SELECT a, b FROM t",100,500.0,5.0"#;
let fields = parse_csv_row(row);
assert_eq!(fields[0], "SELECT a, b FROM t");
assert_eq!(fields[1], "100");
}
#[test]
fn csv_row_simple() {
let row = "a,b,c,d";
let fields = parse_csv_row(row);
assert_eq!(fields, vec!["a", "b", "c", "d"]);
}
#[test]
fn csv_row_with_utf8_content() {
let row = "\"SELECT * FROM café WHERE naïve = 'résumé'\",100,500.0,5.0";
let fields = parse_csv_row(row);
assert_eq!(fields[0], "SELECT * FROM café WHERE naïve = 'résumé'");
}
#[test]
fn parse_invalid_utf8_returns_error() {
let data: &[u8] = &[0xFF, 0xFE, 0x00, 0x01];
let result = parse_pg_stat(data, 1_048_576);
assert!(matches!(result, Err(PgStatError::CsvParse { line: 0, .. })));
}
#[test]
fn parse_csv_invalid_number_returns_error() {
let csv = "query,calls,total_exec_time,mean_exec_time\nSELECT 1,abc,500.0,5.0";
let result = parse_pg_stat(csv.as_bytes(), 1_048_576);
assert!(matches!(result, Err(PgStatError::CsvParse { line: 2, .. })));
}
#[cfg(any(feature = "daemon", feature = "tempo"))]
#[test]
fn parse_prometheus_response_basic() {
let json = br#"{
"status": "success",
"data": {
"resultType": "vector",
"result": [
{
"metric": {
"__name__": "pg_stat_statements_seconds_total",
"query": "SELECT * FROM orders WHERE id = $1"
},
"value": [1720000000, "4.5"]
},
{
"metric": {
"__name__": "pg_stat_statements_seconds_total",
"query": "INSERT INTO audit_log VALUES ($1)"
},
"value": [1720000000, "1.2"]
}
]
}
}"#;
let entries = parse_prometheus_response(json).unwrap();
assert_eq!(entries.len(), 2);
assert!((entries[0].total_exec_time_ms - 4500.0).abs() < f64::EPSILON);
assert!((entries[1].total_exec_time_ms - 1200.0).abs() < f64::EPSILON);
assert!(entries[0].normalized_template.contains('?'));
}
#[cfg(any(feature = "daemon", feature = "tempo"))]
#[test]
fn parse_prometheus_response_empty_result() {
let json = br#"{"status":"success","data":{"resultType":"vector","result":[]}}"#;
let entries = parse_prometheus_response(json).unwrap();
assert!(entries.is_empty());
}
#[cfg(any(feature = "daemon", feature = "tempo"))]
#[test]
fn parse_prometheus_response_invalid_json() {
let result = parse_prometheus_response(b"not json");
assert!(matches!(result, Err(PgStatError::PrometheusFormat(_))));
}
#[cfg(any(feature = "daemon", feature = "tempo"))]
#[test]
fn validate_endpoint_accepts_http_and_https() {
assert!(validate_prometheus_endpoint("http://prometheus:9090").is_ok());
assert!(validate_prometheus_endpoint("https://prometheus.example.com").is_ok());
assert!(validate_prometheus_endpoint("http://127.0.0.1:9090").is_ok());
}
#[cfg(any(feature = "daemon", feature = "tempo"))]
#[test]
fn validate_endpoint_rejects_malformed_url() {
let result = validate_prometheus_endpoint("not a url");
assert!(matches!(result, Err(PgStatError::PrometheusRequest(_))));
}
#[cfg(any(feature = "daemon", feature = "tempo"))]
#[test]
fn validate_endpoint_rejects_userinfo() {
let result = validate_prometheus_endpoint("http://user:pass@prometheus:9090");
assert!(
matches!(result, Err(PgStatError::PrometheusRequest(msg)) if msg.contains("credentials")),
"must reject userinfo in URL"
);
}
#[cfg(any(feature = "daemon", feature = "tempo"))]
#[test]
fn validate_endpoint_rejects_non_http_scheme() {
let result = validate_prometheus_endpoint("ftp://prometheus:9090");
assert!(
matches!(result, Err(PgStatError::PrometheusRequest(msg)) if msg.contains("scheme")),
"must reject non-http(s) schemes"
);
}
}