use crate::collectors::{Collector, util::get_excluded_databases};
use anyhow::Result;
use futures::future::BoxFuture;
use prometheus::{Gauge, IntGauge, IntGaugeVec, Opts, Registry};
use sqlx::{PgPool, Row};
use std::collections::{HashMap, HashSet};
use tracing::{debug, info_span, instrument};
use tracing_futures::Instrument as _;
#[derive(Clone)]
pub struct ConnectionsCollector {
count_by_state: IntGaugeVec, active_connections: IntGaugeVec, idle_connections: IntGaugeVec, waiting_connections: IntGaugeVec, blocked_connections: IntGaugeVec,
max_connections: IntGauge, used_connections: IntGauge, utilization_ratio: Gauge, available_connections: IntGauge,
idle_in_transaction: IntGaugeVec, idle_in_transaction_aborted: IntGaugeVec,
connections_by_application: IntGaugeVec,
idle_age_1m: IntGaugeVec, idle_age_5m: IntGaugeVec, idle_age_15m: IntGaugeVec, idle_age_1h: IntGaugeVec, idle_age_old: IntGaugeVec, }
impl Default for ConnectionsCollector {
fn default() -> Self {
Self::new()
}
}
impl ConnectionsCollector {
pub fn new() -> Self {
let count_by_state = IntGaugeVec::new(
Opts::new(
"pg_stat_activity_count",
"Number of client backends by database and state (from pg_stat_activity)",
),
&["datname", "state"],
)
.expect("Failed to create pg_stat_activity_count");
let active_connections = IntGaugeVec::new(
Opts::new(
"pg_stat_activity_active_connections",
"Number of active client connections per database",
),
&["datname"],
)
.expect("Failed to create pg_stat_activity_active_connections");
let idle_connections = IntGaugeVec::new(
Opts::new(
"pg_stat_activity_idle_connections",
"Number of idle client connections per database",
),
&["datname"],
)
.expect("Failed to create pg_stat_activity_idle_connections");
let waiting_connections = IntGaugeVec::new(
Opts::new(
"pg_stat_activity_waiting_connections",
"Number of client connections currently waiting (wait_event IS NOT NULL) per database",
),
&["datname"],
)
.expect("Failed to create pg_stat_activity_waiting_connections");
let blocked_connections = IntGaugeVec::new(
Opts::new(
"pg_stat_activity_blocked_connections",
"Number of client connections blocked by locks per database",
),
&["datname"],
)
.expect("Failed to create pg_stat_activity_blocked_connections");
let max_connections = IntGauge::with_opts(Opts::new(
"pg_stat_activity_max_connections",
"Maximum allowed connections (from pg_settings.max_connections)",
))
.expect("Failed to create pg_stat_activity_max_connections");
let used_connections = IntGauge::with_opts(Opts::new(
"pg_stat_activity_used_connections",
"Current number of connections in use (all client backends)",
))
.expect("Failed to create pg_stat_activity_used_connections");
let utilization_ratio = Gauge::with_opts(Opts::new(
"pg_stat_activity_utilization_ratio",
"Connection pool utilization ratio (used/max, 0.0-1.0). Alert when >0.8",
))
.expect("Failed to create pg_stat_activity_utilization_ratio");
let available_connections = IntGauge::with_opts(Opts::new(
"pg_stat_activity_available_connections",
"Number of connections still available (max - used)",
))
.expect("Failed to create pg_stat_activity_available_connections");
let idle_in_transaction = IntGaugeVec::new(
Opts::new(
"pg_stat_activity_idle_in_transaction",
"Connections idle in transaction (holding locks/snapshots). Should be ~0 in healthy systems.",
),
&["datname"],
)
.expect("Failed to create pg_stat_activity_idle_in_transaction");
let idle_in_transaction_aborted = IntGaugeVec::new(
Opts::new(
"pg_stat_activity_idle_in_transaction_aborted",
"Connections idle in aborted transaction (failed tx not cleaned up). Critical issue.",
),
&["datname"],
)
.expect("Failed to create pg_stat_activity_idle_in_transaction_aborted");
let connections_by_application = IntGaugeVec::new(
Opts::new(
"pg_stat_activity_connections_by_application",
"Number of connections per application (identify connection hogs in K8s)",
),
&["datname", "application_name"],
)
.expect("Failed to create pg_stat_activity_connections_by_application");
let idle_age_1m = IntGaugeVec::new(
Opts::new(
"pg_stat_activity_idle_age_1m",
"Number of idle connections aged <1 minute per database",
),
&["datname"],
)
.expect("Failed to create pg_stat_activity_idle_age_1m");
let idle_age_5m = IntGaugeVec::new(
Opts::new(
"pg_stat_activity_idle_age_5m",
"Number of idle connections aged 1-5 minutes per database",
),
&["datname"],
)
.expect("Failed to create pg_stat_activity_idle_age_5m");
let idle_age_15m = IntGaugeVec::new(
Opts::new(
"pg_stat_activity_idle_age_15m",
"Number of idle connections aged 5-15 minutes per database",
),
&["datname"],
)
.expect("Failed to create pg_stat_activity_idle_age_15m");
let idle_age_1h = IntGaugeVec::new(
Opts::new(
"pg_stat_activity_idle_age_1h",
"Number of idle connections aged 15m-1h per database (investigate)",
),
&["datname"],
)
.expect("Failed to create pg_stat_activity_idle_age_1h");
let idle_age_old = IntGaugeVec::new(
Opts::new(
"pg_stat_activity_idle_age_old",
"Number of idle connections aged >1 hour per database (connection leak!)",
),
&["datname"],
)
.expect("Failed to create pg_stat_activity_idle_age_old");
Self {
count_by_state,
active_connections,
idle_connections,
waiting_connections,
blocked_connections,
max_connections,
used_connections,
utilization_ratio,
available_connections,
idle_in_transaction,
idle_in_transaction_aborted,
connections_by_application,
idle_age_1m,
idle_age_5m,
idle_age_15m,
idle_age_1h,
idle_age_old,
}
}
}
impl Collector for ConnectionsCollector {
fn name(&self) -> &'static str {
"connections"
}
#[instrument(
skip(self, registry),
level = "info",
err,
fields(collector = "connections")
)]
fn register_metrics(&self, registry: &Registry) -> Result<()> {
registry.register(Box::new(self.count_by_state.clone()))?;
registry.register(Box::new(self.active_connections.clone()))?;
registry.register(Box::new(self.idle_connections.clone()))?;
registry.register(Box::new(self.waiting_connections.clone()))?;
registry.register(Box::new(self.blocked_connections.clone()))?;
registry.register(Box::new(self.max_connections.clone()))?;
registry.register(Box::new(self.used_connections.clone()))?;
registry.register(Box::new(self.utilization_ratio.clone()))?;
registry.register(Box::new(self.available_connections.clone()))?;
registry.register(Box::new(self.idle_in_transaction.clone()))?;
registry.register(Box::new(self.idle_in_transaction_aborted.clone()))?;
registry.register(Box::new(self.connections_by_application.clone()))?;
registry.register(Box::new(self.idle_age_1m.clone()))?;
registry.register(Box::new(self.idle_age_5m.clone()))?;
registry.register(Box::new(self.idle_age_15m.clone()))?;
registry.register(Box::new(self.idle_age_1h.clone()))?;
registry.register(Box::new(self.idle_age_old.clone()))?;
Ok(())
}
#[instrument(
skip(self, pool),
level = "info",
err,
fields(collector="connections", otel.kind="internal")
)]
fn collect<'a>(&'a self, pool: &'a PgPool) -> BoxFuture<'a, Result<()>> {
Box::pin(async move {
let excluded: Vec<String> = get_excluded_databases().to_vec();
let max_conn_query = info_span!(
"db.query",
otel.kind = "client",
db.system = "postgresql",
db.operation = "SELECT",
db.statement = "SHOW max_connections",
db.sql.table = "pg_settings"
);
let max_conn: i64 = sqlx::query_scalar(
"SELECT setting::bigint FROM pg_settings WHERE name = 'max_connections'"
)
.fetch_one(pool)
.instrument(max_conn_query)
.await
.unwrap_or(100);
self.max_connections.set(max_conn);
let q_state = info_span!(
"db.query",
otel.kind = "client",
db.system = "postgresql",
db.operation = "SELECT",
db.statement = "SELECT datname, state, COUNT(*) FROM pg_stat_activity (filtered)",
db.sql.table = "pg_stat_activity"
);
let state_rows = sqlx::query(
r#"
SELECT
datname,
COALESCE(state, 'unknown') AS state,
COUNT(*)::bigint AS cnt
FROM pg_stat_activity
WHERE backend_type = 'client backend'
AND pid != pg_backend_pid()
AND NOT (COALESCE(datname, '') = ANY($1))
GROUP BY datname, COALESCE(state, 'unknown')
ORDER BY datname, COALESCE(state, 'unknown')
"#,
)
.bind(&excluded)
.fetch_all(pool)
.instrument(q_state)
.await?;
let mut dbs_seen: HashSet<String> = HashSet::new();
let mut active_map: HashMap<String, i64> = HashMap::new();
let mut idle_map: HashMap<String, i64> = HashMap::new();
for row in &state_rows {
let db: String = row
.try_get::<Option<String>, _>("datname")?
.unwrap_or_else(|| "[unknown]".to_string());
let state: String = row.try_get::<String, _>("state")?;
let cnt: i64 = row.try_get::<i64, _>("cnt").unwrap_or(0);
dbs_seen.insert(db.clone());
self.count_by_state
.with_label_values(&[&db, &state])
.set(cnt);
if state == "active" {
active_map.insert(db.clone(), cnt);
} else if state == "idle" {
idle_map.insert(db.clone(), cnt);
}
}
for db in &dbs_seen {
let a = *active_map.get(db).unwrap_or(&0);
let i = *idle_map.get(db).unwrap_or(&0);
self.active_connections.with_label_values(&[db]).set(a);
self.idle_connections.with_label_values(&[db]).set(i);
debug!(database=%db, active=a, idle=i, "set active/idle gauges");
}
let q_wait_block = info_span!(
"db.query",
otel.kind = "client",
db.system = "postgresql",
db.operation = "SELECT",
db.statement = "SELECT wait/blocked per db from pg_stat_activity (filtered + pg_blocking_pids)",
db.sql.table = "pg_stat_activity"
);
let wait_block_rows = sqlx::query(
r#"
SELECT
a.datname,
COUNT(*) FILTER (WHERE a.wait_event IS NOT NULL)::bigint AS waiting,
COUNT(*) FILTER (WHERE cardinality(pg_blocking_pids(a.pid)) > 0)::bigint AS blocked
FROM pg_stat_activity a
WHERE a.backend_type = 'client backend'
AND a.pid != pg_backend_pid()
AND NOT (COALESCE(a.datname, '') = ANY($1))
GROUP BY a.datname
ORDER BY a.datname
"#,
)
.bind(&excluded)
.fetch_all(pool)
.instrument(q_wait_block)
.await?;
let mut waiting_map: HashMap<String, i64> = HashMap::new();
let mut blocked_map: HashMap<String, i64> = HashMap::new();
for row in &wait_block_rows {
let db: String = row
.try_get::<Option<String>, _>("datname")?
.unwrap_or_else(|| "[unknown]".to_string());
let waiting: i64 = row.try_get::<i64, _>("waiting").unwrap_or(0);
let blocked: i64 = row.try_get::<i64, _>("blocked").unwrap_or(0);
dbs_seen.insert(db.clone());
waiting_map.insert(db.clone(), waiting);
blocked_map.insert(db.clone(), blocked);
self.waiting_connections
.with_label_values(&[&db])
.set(waiting);
self.blocked_connections
.with_label_values(&[&db])
.set(blocked);
debug!(database=%db, waiting, blocked, "set waiting/blocked gauges");
}
for db in &dbs_seen {
if !waiting_map.contains_key(db) {
self.waiting_connections.with_label_values(&[db]).set(0);
}
if !blocked_map.contains_key(db) {
self.blocked_connections.with_label_values(&[db]).set(0);
}
}
let q_detailed = info_span!(
"db.query",
otel.kind = "client",
db.system = "postgresql",
db.operation = "SELECT",
db.statement = "SELECT detailed connection info from pg_stat_activity",
db.sql.table = "pg_stat_activity"
);
let detailed_rows = sqlx::query(
r#"
SELECT
datname,
COALESCE(state, 'unknown') AS state,
application_name,
EXTRACT(EPOCH FROM (now() - state_change))::bigint AS state_duration_seconds,
COUNT(*)::bigint AS cnt
FROM pg_stat_activity
WHERE backend_type = 'client backend'
AND pid != pg_backend_pid()
AND NOT (COALESCE(datname, '') = ANY($1))
GROUP BY datname, COALESCE(state, 'unknown'), application_name, EXTRACT(EPOCH FROM (now() - state_change))::bigint
"#,
)
.bind(&excluded)
.fetch_all(pool)
.instrument(q_detailed)
.await?;
let mut total_connections: i64 = 0;
let mut idle_in_tx_map: HashMap<String, i64> = HashMap::new();
let mut idle_in_tx_aborted_map: HashMap<String, i64> = HashMap::new();
let mut app_conn_map: HashMap<(String, String), i64> = HashMap::new();
let mut idle_1m_map: HashMap<String, i64> = HashMap::new();
let mut idle_5m_map: HashMap<String, i64> = HashMap::new();
let mut idle_15m_map: HashMap<String, i64> = HashMap::new();
let mut idle_1h_map: HashMap<String, i64> = HashMap::new();
let mut idle_old_map: HashMap<String, i64> = HashMap::new();
for row in &detailed_rows {
let db: String = row
.try_get::<Option<String>, _>("datname")?
.unwrap_or_else(|| "[unknown]".to_string());
let state: String = row.try_get::<String, _>("state")?;
let app_name: String = row.try_get::<Option<String>, _>("application_name")?
.unwrap_or_else(|| "".to_string());
let state_duration: i64 = row.try_get::<i64, _>("state_duration_seconds").unwrap_or(0);
let cnt: i64 = row.try_get::<i64, _>("cnt").unwrap_or(0);
total_connections += cnt;
dbs_seen.insert(db.clone());
if state == "idle in transaction" {
*idle_in_tx_map.entry(db.clone()).or_insert(0) += cnt;
} else if state == "idle in transaction (aborted)" {
*idle_in_tx_aborted_map.entry(db.clone()).or_insert(0) += cnt;
}
let app_label = if app_name.is_empty() {
"[unknown]".to_string()
} else {
app_name
};
let key = (db.clone(), app_label);
*app_conn_map.entry(key).or_insert(0) += cnt;
if state == "idle" {
if state_duration < 60 {
*idle_1m_map.entry(db.clone()).or_insert(0) += cnt;
} else if state_duration < 300 {
*idle_5m_map.entry(db.clone()).or_insert(0) += cnt;
} else if state_duration < 900 {
*idle_15m_map.entry(db.clone()).or_insert(0) += cnt;
} else if state_duration < 3600 {
*idle_1h_map.entry(db.clone()).or_insert(0) += cnt;
} else {
*idle_old_map.entry(db.clone()).or_insert(0) += cnt;
}
}
}
self.used_connections.set(total_connections);
let utilization = if max_conn > 0 {
total_connections as f64 / max_conn as f64
} else {
0.0
};
self.utilization_ratio.set(utilization);
let available = std::cmp::max(0, max_conn - total_connections);
self.available_connections.set(available);
debug!(
total_connections,
max_connections = max_conn,
utilization_ratio = utilization,
available_connections = available,
"set pool saturation metrics"
);
for db in &dbs_seen {
let idle_in_tx = *idle_in_tx_map.get(db).unwrap_or(&0);
let idle_in_tx_aborted = *idle_in_tx_aborted_map.get(db).unwrap_or(&0);
self.idle_in_transaction.with_label_values(&[db]).set(idle_in_tx);
self.idle_in_transaction_aborted.with_label_values(&[db]).set(idle_in_tx_aborted);
let idle_1m = *idle_1m_map.get(db).unwrap_or(&0);
let idle_5m = *idle_5m_map.get(db).unwrap_or(&0);
let idle_15m = *idle_15m_map.get(db).unwrap_or(&0);
let idle_1h = *idle_1h_map.get(db).unwrap_or(&0);
let idle_old = *idle_old_map.get(db).unwrap_or(&0);
self.idle_age_1m.with_label_values(&[db]).set(idle_1m);
self.idle_age_5m.with_label_values(&[db]).set(idle_5m);
self.idle_age_15m.with_label_values(&[db]).set(idle_15m);
self.idle_age_1h.with_label_values(&[db]).set(idle_1h);
self.idle_age_old.with_label_values(&[db]).set(idle_old);
debug!(
database = %db,
idle_in_transaction = idle_in_tx,
idle_in_transaction_aborted = idle_in_tx_aborted,
idle_1m, idle_5m, idle_15m, idle_1h, idle_old,
"set pool detail metrics"
);
}
for ((db, app_name), cnt) in &app_conn_map {
self.connections_by_application
.with_label_values(&[db, app_name])
.set(*cnt);
}
Ok(())
})
}
}