use crate::collectors::{Collector, i64_to_f64, util::get_excluded_databases};
use anyhow::Result;
use futures::future::BoxFuture;
use prometheus::{Gauge, GaugeVec, IntGauge, IntGaugeVec, Opts, Registry};
use sqlx::{PgPool, Row};
use std::collections::HashMap;
use tracing::{debug, info_span, instrument};
use tracing_futures::Instrument as _;
#[derive(Clone)]
pub struct QueriesCollector {
queries_over_5m: IntGaugeVec, queries_over_15m: IntGaugeVec, queries_over_1h: IntGaugeVec, queries_over_6h: IntGaugeVec,
max_query_duration: GaugeVec,
oldest_query_age: Gauge, total_long_running: IntGauge,
long_running_by_state: IntGaugeVec,
long_running_by_wait_event: IntGaugeVec, }
impl Default for QueriesCollector {
fn default() -> Self {
Self::new()
}
}
impl QueriesCollector {
#[must_use]
#[allow(clippy::expect_used)]
pub fn new() -> Self {
let queries_threshold_short = IntGaugeVec::new(
Opts::new(
"pg_stat_activity_queries_over_5m",
"Number of queries running for more than 5 minutes per database",
),
&["datname"],
)
.expect("Failed to create pg_stat_activity_queries_over_5m");
let queries_threshold_medium = IntGaugeVec::new(
Opts::new(
"pg_stat_activity_queries_over_15m",
"Number of queries running for more than 15 minutes per database",
),
&["datname"],
)
.expect("Failed to create pg_stat_activity_queries_over_15m");
let queries_threshold_extended = IntGaugeVec::new(
Opts::new(
"pg_stat_activity_queries_over_1h",
"Number of queries running for more than 1 hour per database",
),
&["datname"],
)
.expect("Failed to create pg_stat_activity_queries_over_1h");
let queries_threshold_prolonged = IntGaugeVec::new(
Opts::new(
"pg_stat_activity_queries_over_6h",
"Number of queries running for more than 6 hours per database (likely stuck!)",
),
&["datname"],
)
.expect("Failed to create pg_stat_activity_queries_over_6h");
let max_query_duration = GaugeVec::new(
Opts::new(
"pg_stat_activity_max_query_duration_seconds",
"Duration in seconds of the longest running query per database",
),
&["datname"],
)
.expect("Failed to create pg_stat_activity_max_query_duration_seconds");
let oldest_query_age = Gauge::with_opts(Opts::new(
"pg_stat_activity_oldest_query_age_seconds",
"Age in seconds of the oldest running query across all databases. Alert when >3600 (1 hour)",
))
.expect("Failed to create pg_stat_activity_oldest_query_age_seconds");
let total_long_running = IntGauge::with_opts(Opts::new(
"pg_stat_activity_total_long_running",
"Total number of queries running for more than 5 minutes across all databases",
))
.expect("Failed to create pg_stat_activity_total_long_running");
let long_running_by_state = IntGaugeVec::new(
Opts::new(
"pg_stat_activity_long_running_by_state",
"Number of long-running queries (>5min) by database and state (active, waiting, etc.)",
),
&["datname", "state"],
)
.expect("Failed to create pg_stat_activity_long_running_by_state");
let long_running_by_wait_event = IntGaugeVec::new(
Opts::new(
"pg_stat_activity_long_running_by_wait_event",
"Number of long-running queries (>5min) by database and wait_event_type (Lock, IO, etc.)",
),
&["datname", "wait_event_type"],
)
.expect("Failed to create pg_stat_activity_long_running_by_wait_event");
Self {
queries_over_5m: queries_threshold_short,
queries_over_15m: queries_threshold_medium,
queries_over_1h: queries_threshold_extended,
queries_over_6h: queries_threshold_prolonged,
max_query_duration,
oldest_query_age,
total_long_running,
long_running_by_state,
long_running_by_wait_event,
}
}
}
impl Collector for QueriesCollector {
fn name(&self) -> &'static str {
"queries"
}
#[instrument(
skip(self, registry),
level = "info",
err,
fields(collector = "queries")
)]
fn register_metrics(&self, registry: &Registry) -> Result<()> {
registry.register(Box::new(self.queries_over_5m.clone()))?;
registry.register(Box::new(self.queries_over_15m.clone()))?;
registry.register(Box::new(self.queries_over_1h.clone()))?;
registry.register(Box::new(self.queries_over_6h.clone()))?;
registry.register(Box::new(self.max_query_duration.clone()))?;
registry.register(Box::new(self.oldest_query_age.clone()))?;
registry.register(Box::new(self.total_long_running.clone()))?;
registry.register(Box::new(self.long_running_by_state.clone()))?;
registry.register(Box::new(self.long_running_by_wait_event.clone()))?;
Ok(())
}
#[instrument(
skip(self, pool),
level = "info",
err,
fields(collector="queries", otel.kind="internal")
)]
fn collect<'a>(&'a self, pool: &'a PgPool) -> BoxFuture<'a, Result<()>> {
Box::pin(async move {
let excluded: Vec<String> = get_excluded_databases().to_vec();
let query_span = info_span!(
"db.query",
otel.kind = "client",
db.system = "postgresql",
db.operation = "SELECT",
db.statement = "SELECT long-running queries from pg_stat_activity",
db.sql.table = "pg_stat_activity"
);
let rows = sqlx::query(
r"
SELECT
datname,
state,
COALESCE(wait_event_type, 'None') AS wait_event_type,
EXTRACT(EPOCH FROM (now() - query_start))::bigint AS duration_seconds
FROM pg_stat_activity
WHERE backend_type = 'client backend'
AND pid != pg_backend_pid()
AND state != 'idle'
AND query_start IS NOT NULL
AND (now() - query_start) > interval '5 minutes'
AND query NOT LIKE 'autovacuum:%'
AND NOT (COALESCE(datname, '') = ANY($1))
",
)
.bind(&excluded)
.fetch_all(pool)
.instrument(query_span)
.await?;
let mut db_counts_5m: HashMap<String, i64> = HashMap::new();
let mut db_counts_15m: HashMap<String, i64> = HashMap::new();
let mut db_counts_1h: HashMap<String, i64> = HashMap::new();
let mut db_counts_6h: HashMap<String, i64> = HashMap::new();
let mut db_max_duration: HashMap<String, f64> = HashMap::new();
let mut state_counts: HashMap<(String, String), i64> = HashMap::new();
let mut wait_event_counts: HashMap<(String, String), i64> = HashMap::new();
let mut total_long = 0i64;
let mut global_oldest = 0f64;
for row in &rows {
let db: String = row
.try_get::<Option<String>, _>("datname")?
.unwrap_or_else(|| "[unknown]".to_string());
let state: String = row.try_get("state")?;
let wait_event_type: String = row.try_get("wait_event_type")?;
let duration: i64 = row.try_get("duration_seconds").unwrap_or(0);
let duration_f64 = i64_to_f64(duration);
total_long += 1;
if duration_f64 > global_oldest {
global_oldest = duration_f64;
}
let current_max = db_max_duration.get(&db).copied().unwrap_or(0.0);
if duration_f64 > current_max {
db_max_duration.insert(db.clone(), duration_f64);
}
*db_counts_5m.entry(db.clone()).or_insert(0) += 1;
if duration >= 900 {
*db_counts_15m.entry(db.clone()).or_insert(0) += 1;
}
if duration >= 3600 {
*db_counts_1h.entry(db.clone()).or_insert(0) += 1;
}
if duration >= 21600 {
*db_counts_6h.entry(db.clone()).or_insert(0) += 1;
}
let state_key = (db.clone(), state.clone());
*state_counts.entry(state_key).or_insert(0) += 1;
if wait_event_type != "None" {
let wait_key = (db.clone(), wait_event_type);
*wait_event_counts.entry(wait_key).or_insert(0) += 1;
}
}
self.total_long_running.set(total_long);
self.oldest_query_age.set(global_oldest);
for (db, count) in &db_counts_5m {
self.queries_over_5m.with_label_values(&[db]).set(*count);
}
for (db, count) in &db_counts_15m {
self.queries_over_15m.with_label_values(&[db]).set(*count);
}
for (db, count) in &db_counts_1h {
self.queries_over_1h.with_label_values(&[db]).set(*count);
}
for (db, count) in &db_counts_6h {
self.queries_over_6h.with_label_values(&[db]).set(*count);
}
for (db, max_dur) in &db_max_duration {
self.max_query_duration.with_label_values(&[db]).set(*max_dur);
}
for ((db, state), count) in &state_counts {
self.long_running_by_state
.with_label_values(&[db, state])
.set(*count);
}
for ((db, wait_event_type), count) in &wait_event_counts {
self.long_running_by_wait_event
.with_label_values(&[db, wait_event_type])
.set(*count);
}
debug!(
total_long_running = total_long,
oldest_query_age_seconds = global_oldest,
databases_with_slow_queries = db_counts_5m.len(),
"updated long-running query metrics"
);
Ok(())
})
}
}