use chrono::Utc;
use std::fmt::Write;
use crate::features::connection_pooler::ConnectionPoolSnapshot;
#[derive(Debug, Default, Clone)]
pub struct PoolMetricsAggregate {
pub total_size: u64,
pub total_idle: u64,
pub total_active: u64,
pub total_max: u64,
}
impl PoolMetricsAggregate {
pub fn observe(&mut self, snapshot: &ConnectionPoolSnapshot) {
self.total_size = self
.total_size
.saturating_add(u64::from(snapshot.pool_size));
self.total_idle = self
.total_idle
.saturating_add(u64::from(snapshot.idle_connections));
self.total_active = self
.total_active
.saturating_add(u64::from(snapshot.active_connections));
self.total_max = self
.total_max
.saturating_add(u64::from(snapshot.max_connections));
}
pub fn write_totals(&self, body: &mut String) {
write_help_and_type(
body,
"athena_pg_pool_connections_total",
"Aggregated Postgres pool connection counts across all registered client pools.",
"gauge",
);
write_with_labels(
body,
"athena_pg_pool_connections_total",
"state=\"size\"",
self.total_size,
);
write_with_labels(
body,
"athena_pg_pool_connections_total",
"state=\"idle\"",
self.total_idle,
);
write_with_labels(
body,
"athena_pg_pool_connections_total",
"state=\"active\"",
self.total_active,
);
write_help_and_type(
body,
"athena_pg_pool_max_connections_total",
"Sum of configured max_connections across all registered client pools.",
"gauge",
);
write_value(body, "athena_pg_pool_max_connections_total", self.total_max);
write_help_and_type(
body,
"athena_pg_pool_overall_utilization_ratio",
"Overall utilization ratio (active/size) across all Postgres pools.",
"gauge",
);
let utilization: f64 = if self.total_size == 0 {
0.0
} else {
self.total_active as f64 / self.total_size as f64
};
write_value(
body,
"athena_pg_pool_overall_utilization_ratio",
format!("{utilization:.6}"),
);
write_help_and_type(
body,
"athena_pg_pool_headroom_total",
"Remaining provisioned capacity across all pools (sum(max) - sum(size)).",
"gauge",
);
let headroom: u64 = self.total_max.saturating_sub(self.total_size);
write_value(body, "athena_pg_pool_headroom_total", headroom);
}
}
impl ConnectionPoolSnapshot {
pub fn write_prometheus(&self, body: &mut String, labels: &str) {
write_with_labels(body, "athena_pg_pool_up", labels, 1);
write_with_labels(
body,
"athena_pg_pool_closed",
labels,
if self.is_closed { 1 } else { 0 },
);
write_with_labels(
body,
"athena_pg_pool_connections",
&format!("{labels},state=\"size\""),
self.pool_size,
);
write_with_labels(
body,
"athena_pg_pool_connections",
&format!("{labels},state=\"idle\""),
self.idle_connections,
);
write_with_labels(
body,
"athena_pg_pool_connections",
&format!("{labels},state=\"active\""),
self.active_connections,
);
write_with_labels(
body,
"athena_pg_pool_max_connections",
labels,
self.max_connections,
);
let utilization: f64 = if self.pool_size == 0 {
0.0
} else {
self.active_connections as f64 / self.pool_size as f64
};
let idle_ratio: f64 = if self.pool_size == 0 {
0.0
} else {
self.idle_connections as f64 / self.pool_size as f64
};
let saturation: f64 = if self.max_connections == 0 {
0.0
} else {
self.pool_size as f64 / self.max_connections as f64
};
let headroom: u32 = self.max_connections.saturating_sub(self.pool_size);
write_with_labels(
body,
"athena_pg_pool_connection_utilization_ratio",
labels,
format!("{utilization:.6}"),
);
write_with_labels(
body,
"athena_pg_pool_idle_ratio",
labels,
format!("{idle_ratio:.6}"),
);
write_with_labels(
body,
"athena_pg_pool_saturation_ratio",
labels,
format!("{saturation:.6}"),
);
write_with_labels(body, "athena_pg_pool_headroom", labels, headroom);
let age_seconds: f64 =
(Utc::now() - self.recorded_at).num_milliseconds().max(0) as f64 / 1000.0;
write_with_labels(
body,
"athena_pg_pool_snapshot_age_seconds",
labels,
format!("{age_seconds:.6}"),
);
}
pub fn write_prometheus_absent(body: &mut String, labels: &str) {
write_with_labels(body, "athena_pg_pool_up", labels, 0);
write_with_labels(body, "athena_pg_pool_closed", labels, 1);
write_with_labels(
body,
"athena_pg_pool_connections",
&format!("{labels},state=\"size\""),
0,
);
write_with_labels(
body,
"athena_pg_pool_connections",
&format!("{labels},state=\"idle\""),
0,
);
write_with_labels(
body,
"athena_pg_pool_connections",
&format!("{labels},state=\"active\""),
0,
);
write_with_labels(body, "athena_pg_pool_max_connections", labels, 0);
write_with_labels(
body,
"athena_pg_pool_connection_utilization_ratio",
labels,
0,
);
write_with_labels(body, "athena_pg_pool_idle_ratio", labels, 0);
write_with_labels(body, "athena_pg_pool_saturation_ratio", labels, 0);
write_with_labels(body, "athena_pg_pool_headroom", labels, 0);
write_with_labels(body, "athena_pg_pool_snapshot_age_seconds", labels, 0);
}
pub fn write_prometheus_preamble(body: &mut String) {
write_help_and_type(
body,
"athena_pg_pool_up",
"Whether a registered Postgres client currently has a live SQLx pool handle.",
"gauge",
);
write_help_and_type(
body,
"athena_pg_pool_closed",
"Whether a registered Postgres client SQLx pool is closed.",
"gauge",
);
write_help_and_type(
body,
"athena_pg_pool_connections",
"Connection counts for registered Postgres client pools by state.",
"gauge",
);
write_help_and_type(
body,
"athena_pg_pool_max_connections",
"Configured upper bound on connections for each Postgres client pool.",
"gauge",
);
write_help_and_type(
body,
"athena_pg_pool_connection_utilization_ratio",
"Utilization ratio (active/size) for each Postgres client pool. Sustained values near 1.0 with growing insert latency often indicate pool starvation or slow reconnect after network blips.",
"gauge",
);
write_help_and_type(
body,
"athena_pg_pool_idle_ratio",
"Idle ratio (idle/size) for each Postgres client pool.",
"gauge",
);
write_help_and_type(
body,
"athena_pg_pool_saturation_ratio",
"Saturation ratio (size/max_connections) showing how close each pool is to its configured ceiling.",
"gauge",
);
write_help_and_type(
body,
"athena_pg_pool_headroom",
"Remaining provisionable connections for each pool (max_connections - size).",
"gauge",
);
write_help_and_type(
body,
"athena_pg_pool_snapshot_age_seconds",
"Age in seconds of the most recent ConnectionPoolSnapshot for each client (0 means just captured; large values indicate a stalled monitor).",
"gauge",
);
}
}
fn write_help_and_type(body: &mut String, name: &str, help: &str, metric_type: &str) {
let _ = writeln!(body, "# HELP {name} {help}");
let _ = writeln!(body, "# TYPE {name} {metric_type}");
}
fn write_value(body: &mut String, name: &str, value: impl std::fmt::Display) {
let _ = writeln!(body, "{name} {value}");
}
fn write_with_labels(body: &mut String, name: &str, labels: &str, value: impl std::fmt::Display) {
let _ = writeln!(body, "{name}{{{labels}}} {value}");
}