use crate::collectors::Collector;
use anyhow::Result;
use futures::future::BoxFuture;
use prometheus::{IntGaugeVec, Opts, Registry};
use sqlx::{PgPool, Row};
use std::collections::{HashMap, HashSet};
use tracing::{debug, info_span, instrument};
use tracing_futures::Instrument as _;
#[derive(Clone)]
pub struct ConnectionsCollector {
count_by_state: IntGaugeVec,
active_connections: IntGaugeVec, idle_connections: IntGaugeVec, waiting_connections: IntGaugeVec, blocked_connections: IntGaugeVec, }
impl Default for ConnectionsCollector {
fn default() -> Self {
Self::new()
}
}
impl ConnectionsCollector {
pub fn new() -> Self {
let count_by_state = IntGaugeVec::new(
Opts::new(
"pg_stat_activity_count",
"Number of client backends by database and state (from pg_stat_activity)",
),
&["datname", "state"],
)
.expect("Failed to create pg_stat_activity_count");
let active_connections = IntGaugeVec::new(
Opts::new(
"pg_stat_activity_active_connections",
"Number of active client connections per database",
),
&["datname"],
)
.expect("Failed to create pg_stat_activity_active_connections");
let idle_connections = IntGaugeVec::new(
Opts::new(
"pg_stat_activity_idle_connections",
"Number of idle client connections per database",
),
&["datname"],
)
.expect("Failed to create pg_stat_activity_idle_connections");
let waiting_connections = IntGaugeVec::new(
Opts::new(
"pg_stat_activity_waiting_connections",
"Number of client connections currently waiting (wait_event IS NOT NULL) per database",
),
&["datname"],
)
.expect("Failed to create pg_stat_activity_waiting_connections");
let blocked_connections = IntGaugeVec::new(
Opts::new(
"pg_stat_activity_blocked_connections",
"Number of client connections blocked by locks per database",
),
&["datname"],
)
.expect("Failed to create pg_stat_activity_blocked_connections");
Self {
count_by_state,
active_connections,
idle_connections,
waiting_connections,
blocked_connections,
}
}
}
impl Collector for ConnectionsCollector {
fn name(&self) -> &'static str {
"connections"
}
#[instrument(
skip(self, registry),
level = "info",
err,
fields(collector = "connections")
)]
fn register_metrics(&self, registry: &Registry) -> Result<()> {
registry.register(Box::new(self.count_by_state.clone()))?;
registry.register(Box::new(self.active_connections.clone()))?;
registry.register(Box::new(self.idle_connections.clone()))?;
registry.register(Box::new(self.waiting_connections.clone()))?;
registry.register(Box::new(self.blocked_connections.clone()))?;
Ok(())
}
#[instrument(
skip(self, pool),
level = "info",
err,
fields(collector="connections", otel.kind="internal")
)]
fn collect<'a>(&'a self, pool: &'a PgPool) -> BoxFuture<'a, Result<()>> {
Box::pin(async move {
let q_state = info_span!(
"db.query",
otel.kind = "client",
db.system = "postgresql",
db.operation = "SELECT",
db.statement =
"SELECT datname, state, COUNT(*) FROM pg_stat_activity GROUP BY datname,state",
db.sql.table = "pg_stat_activity"
);
let state_rows = sqlx::query(
r#"
SELECT
datname,
COALESCE(state, 'unknown') AS state,
COUNT(*)::bigint AS cnt
FROM pg_stat_activity
WHERE backend_type = 'client backend'
AND pid != pg_backend_pid()
GROUP BY datname, COALESCE(state, 'unknown')
ORDER BY datname, COALESCE(state, 'unknown')
"#,
)
.fetch_all(pool)
.instrument(q_state)
.await?;
let mut dbs_seen: HashSet<String> = HashSet::new();
let mut active_map: HashMap<String, i64> = HashMap::new();
let mut idle_map: HashMap<String, i64> = HashMap::new();
for row in &state_rows {
let db: String = row
.try_get::<Option<String>, _>("datname")?
.unwrap_or_else(|| "[unknown]".to_string());
let state: String = row.try_get::<String, _>("state")?;
let cnt: i64 = row.try_get::<i64, _>("cnt").unwrap_or(0);
dbs_seen.insert(db.clone());
self.count_by_state
.with_label_values(&[&db, &state])
.set(cnt);
if state.eq_ignore_ascii_case("active") {
active_map.insert(db.clone(), cnt);
} else if state.eq_ignore_ascii_case("idle") {
idle_map.insert(db.clone(), cnt);
}
}
for db in &dbs_seen {
let a = *active_map.get(db).unwrap_or(&0);
let i = *idle_map.get(db).unwrap_or(&0);
self.active_connections.with_label_values(&[db]).set(a);
self.idle_connections.with_label_values(&[db]).set(i);
debug!(database=%db, active=a, idle=i, "set active/idle gauges");
}
let q_wait_block = info_span!(
"db.query",
otel.kind = "client",
db.system = "postgresql",
db.operation = "SELECT",
db.statement = "SELECT wait/blocked per db from pg_stat_activity + pg_locks",
db.sql.table = "pg_stat_activity"
);
let wait_block_rows = sqlx::query(
r#"
WITH blocked_pids AS (
SELECT bl.pid
FROM pg_locks bl
JOIN pg_locks wl
ON bl.locktype = wl.locktype
AND bl.database IS NOT DISTINCT FROM wl.database
AND bl.relation IS NOT DISTINCT FROM wl.relation
AND bl.page IS NOT DISTINCT FROM wl.page
AND bl.tuple IS NOT DISTINCT FROM wl.tuple
AND bl.virtualxid IS NOT DISTINCT FROM wl.virtualxid
AND bl.transactionid IS NOT DISTINCT FROM wl.transactionid
AND bl.classid IS NOT DISTINCT FROM wl.classid
AND bl.objid IS NOT DISTINCT FROM wl.objid
AND bl.objsubid IS NOT DISTINCT FROM wl.objsubid
AND bl.pid <> wl.pid
WHERE NOT bl.granted AND wl.granted
)
SELECT
a.datname,
COUNT(*) FILTER (WHERE a.wait_event IS NOT NULL)::bigint AS waiting,
COUNT(*) FILTER (WHERE a.pid IN (SELECT pid FROM blocked_pids))::bigint AS blocked
FROM pg_stat_activity a
WHERE a.backend_type = 'client backend'
AND a.pid != pg_backend_pid()
GROUP BY a.datname
ORDER BY a.datname
"#,
)
.fetch_all(pool)
.instrument(q_wait_block)
.await?;
let mut waiting_map: HashMap<String, i64> = HashMap::new();
let mut blocked_map: HashMap<String, i64> = HashMap::new();
for row in &wait_block_rows {
let db: String = row
.try_get::<Option<String>, _>("datname")?
.unwrap_or_else(|| "[unknown]".to_string());
let waiting: i64 = row.try_get::<i64, _>("waiting").unwrap_or(0);
let blocked: i64 = row.try_get::<i64, _>("blocked").unwrap_or(0);
dbs_seen.insert(db.clone());
waiting_map.insert(db.clone(), waiting);
blocked_map.insert(db.clone(), blocked);
self.waiting_connections
.with_label_values(&[&db])
.set(waiting);
self.blocked_connections
.with_label_values(&[&db])
.set(blocked);
debug!(database=%db, waiting, blocked, "set waiting/blocked gauges");
}
for db in &dbs_seen {
if !waiting_map.contains_key(db) {
self.waiting_connections.with_label_values(&[db]).set(0);
}
if !blocked_map.contains_key(db) {
self.blocked_connections.with_label_values(&[db]).set(0);
}
}
Ok(())
})
}
fn enabled_by_default(&self) -> bool {
false
}
}