use crate::collectors::{Collector, util::get_excluded_databases};
use anyhow::Result;
use futures::future::BoxFuture;
use prometheus::{IntGaugeVec, Opts, Registry};
use sqlx::{PgPool, Row};
use std::collections::{HashMap, HashSet};
use tracing::{debug, info_span, instrument};
use tracing_futures::Instrument as _;
#[derive(Clone)]
pub struct ConnectionsCollector {
count_by_state: IntGaugeVec,
active_connections: IntGaugeVec, idle_connections: IntGaugeVec, waiting_connections: IntGaugeVec, blocked_connections: IntGaugeVec, }
impl Default for ConnectionsCollector {
fn default() -> Self {
Self::new()
}
}
impl ConnectionsCollector {
pub fn new() -> Self {
let count_by_state = IntGaugeVec::new(
Opts::new(
"pg_stat_activity_count",
"Number of client backends by database and state (from pg_stat_activity)",
),
&["datname", "state"],
)
.expect("Failed to create pg_stat_activity_count");
let active_connections = IntGaugeVec::new(
Opts::new(
"pg_stat_activity_active_connections",
"Number of active client connections per database",
),
&["datname"],
)
.expect("Failed to create pg_stat_activity_active_connections");
let idle_connections = IntGaugeVec::new(
Opts::new(
"pg_stat_activity_idle_connections",
"Number of idle client connections per database",
),
&["datname"],
)
.expect("Failed to create pg_stat_activity_idle_connections");
let waiting_connections = IntGaugeVec::new(
Opts::new(
"pg_stat_activity_waiting_connections",
"Number of client connections currently waiting (wait_event IS NOT NULL) per database",
),
&["datname"],
)
.expect("Failed to create pg_stat_activity_waiting_connections");
let blocked_connections = IntGaugeVec::new(
Opts::new(
"pg_stat_activity_blocked_connections",
"Number of client connections blocked by locks per database",
),
&["datname"],
)
.expect("Failed to create pg_stat_activity_blocked_connections");
Self {
count_by_state,
active_connections,
idle_connections,
waiting_connections,
blocked_connections,
}
}
}
impl Collector for ConnectionsCollector {
fn name(&self) -> &'static str {
"connections"
}
#[instrument(
skip(self, registry),
level = "info",
err,
fields(collector = "connections")
)]
fn register_metrics(&self, registry: &Registry) -> Result<()> {
registry.register(Box::new(self.count_by_state.clone()))?;
registry.register(Box::new(self.active_connections.clone()))?;
registry.register(Box::new(self.idle_connections.clone()))?;
registry.register(Box::new(self.waiting_connections.clone()))?;
registry.register(Box::new(self.blocked_connections.clone()))?;
Ok(())
}
#[instrument(
skip(self, pool),
level = "info",
err,
fields(collector="connections", otel.kind="internal")
)]
fn collect<'a>(&'a self, pool: &'a PgPool) -> BoxFuture<'a, Result<()>> {
Box::pin(async move {
let excluded: Vec<String> = get_excluded_databases().to_vec();
let q_state = info_span!(
"db.query",
otel.kind = "client",
db.system = "postgresql",
db.operation = "SELECT",
db.statement = "SELECT datname, state, COUNT(*) FROM pg_stat_activity (filtered)",
db.sql.table = "pg_stat_activity"
);
let state_rows = sqlx::query(
r#"
SELECT
datname,
COALESCE(state, 'unknown') AS state,
COUNT(*)::bigint AS cnt
FROM pg_stat_activity
WHERE backend_type = 'client backend'
AND pid != pg_backend_pid()
AND NOT (COALESCE(datname, '') = ANY($1))
GROUP BY datname, COALESCE(state, 'unknown')
ORDER BY datname, COALESCE(state, 'unknown')
"#,
)
.bind(&excluded)
.fetch_all(pool)
.instrument(q_state)
.await?;
let mut dbs_seen: HashSet<String> = HashSet::new();
let mut active_map: HashMap<String, i64> = HashMap::new();
let mut idle_map: HashMap<String, i64> = HashMap::new();
for row in &state_rows {
let db: String = row
.try_get::<Option<String>, _>("datname")?
.unwrap_or_else(|| "[unknown]".to_string());
let state: String = row.try_get::<String, _>("state")?;
let cnt: i64 = row.try_get::<i64, _>("cnt").unwrap_or(0);
dbs_seen.insert(db.clone());
self.count_by_state
.with_label_values(&[&db, &state])
.set(cnt);
if state == "active" {
active_map.insert(db.clone(), cnt);
} else if state == "idle" {
idle_map.insert(db.clone(), cnt);
}
}
for db in &dbs_seen {
let a = *active_map.get(db).unwrap_or(&0);
let i = *idle_map.get(db).unwrap_or(&0);
self.active_connections.with_label_values(&[db]).set(a);
self.idle_connections.with_label_values(&[db]).set(i);
debug!(database=%db, active=a, idle=i, "set active/idle gauges");
}
let q_wait_block = info_span!(
"db.query",
otel.kind = "client",
db.system = "postgresql",
db.operation = "SELECT",
db.statement = "SELECT wait/blocked per db from pg_stat_activity (filtered + pg_blocking_pids)",
db.sql.table = "pg_stat_activity"
);
let wait_block_rows = sqlx::query(
r#"
SELECT
a.datname,
COUNT(*) FILTER (WHERE a.wait_event IS NOT NULL)::bigint AS waiting,
COUNT(*) FILTER (WHERE cardinality(pg_blocking_pids(a.pid)) > 0)::bigint AS blocked
FROM pg_stat_activity a
WHERE a.backend_type = 'client backend'
AND a.pid != pg_backend_pid()
AND NOT (COALESCE(a.datname, '') = ANY($1))
GROUP BY a.datname
ORDER BY a.datname
"#,
)
.bind(&excluded)
.fetch_all(pool)
.instrument(q_wait_block)
.await?;
let mut waiting_map: HashMap<String, i64> = HashMap::new();
let mut blocked_map: HashMap<String, i64> = HashMap::new();
for row in &wait_block_rows {
let db: String = row
.try_get::<Option<String>, _>("datname")?
.unwrap_or_else(|| "[unknown]".to_string());
let waiting: i64 = row.try_get::<i64, _>("waiting").unwrap_or(0);
let blocked: i64 = row.try_get::<i64, _>("blocked").unwrap_or(0);
dbs_seen.insert(db.clone());
waiting_map.insert(db.clone(), waiting);
blocked_map.insert(db.clone(), blocked);
self.waiting_connections
.with_label_values(&[&db])
.set(waiting);
self.blocked_connections
.with_label_values(&[&db])
.set(blocked);
debug!(database=%db, waiting, blocked, "set waiting/blocked gauges");
}
for db in &dbs_seen {
if !waiting_map.contains_key(db) {
self.waiting_connections.with_label_values(&[db]).set(0);
}
if !blocked_map.contains_key(db) {
self.blocked_connections.with_label_values(&[db]).set(0);
}
}
Ok(())
})
}
}