use crate::collectors::{
Collector, i64_to_f64,
util::{MS_TO_SEC, TEMPLATE0, TEMPLATE1},
};
use anyhow::Result;
use futures::future::BoxFuture;
use prometheus::{GaugeVec, IntGaugeVec, Opts, Registry};
use sqlx::{postgres::PgRow, PgPool, Row};
use std::{
sync::{Arc, Mutex, MutexGuard},
time::{Duration, Instant},
};
use tracing::{debug, info_span, instrument, warn};
use tracing_futures::Instrument as _;
#[derive(Clone)]
pub struct PgStatementsCollector {
total_exec_time: GaugeVec, mean_exec_time: GaugeVec, max_exec_time: GaugeVec, stddev_exec_time: GaugeVec,
calls: IntGaugeVec, rows: IntGaugeVec,
shared_blks_hit: IntGaugeVec, shared_blks_read: IntGaugeVec, shared_blks_dirtied: IntGaugeVec, shared_blks_written: IntGaugeVec,
local_blks_hit: IntGaugeVec, local_blks_read: IntGaugeVec, local_blks_dirtied: IntGaugeVec, local_blks_written: IntGaugeVec,
temp_blks_read: IntGaugeVec, temp_blks_written: IntGaugeVec,
wal_bytes: IntGaugeVec,
cache_hit_ratio: GaugeVec,
top_n: usize,
extension_state: Arc<Mutex<ExtensionState>>,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum ExtensionState {
Unknown,
Installed,
Missing { last_checked: Instant },
}
const MISSING_EXTENSION_RECHECK_AFTER: Duration = Duration::from_mins(1);
impl PgStatementsCollector {
#[must_use]
pub fn with_top_n(top_n: usize) -> Self {
let total_exec_time = statement_gauge(
"pg_stat_statements_total_exec_time_seconds",
"Total time spent executing this query (seconds)",
);
let mean_exec_time = statement_gauge(
"pg_stat_statements_mean_exec_time_seconds",
"Mean time per execution (seconds) - key for finding slow queries",
);
let max_exec_time = statement_gauge(
"pg_stat_statements_max_exec_time_seconds",
"Maximum execution time observed (seconds)",
);
let stddev_exec_time = statement_gauge(
"pg_stat_statements_stddev_exec_time_seconds",
"Standard deviation of execution time - high value indicates inconsistent performance",
);
let calls = statement_int_gauge(
"pg_stat_statements_calls_total",
"Number of times this query has been executed",
);
let rows = statement_int_gauge(
"pg_stat_statements_rows_total",
"Total number of rows retrieved or affected by this query",
);
let shared_blks_hit = statement_int_gauge(
"pg_stat_statements_shared_blks_hit_total",
"Shared block cache hits (found in memory)",
);
let shared_blks_read = statement_int_gauge(
"pg_stat_statements_shared_blks_read_total",
"Shared blocks read from disk (cache miss - expensive!)",
);
let shared_blks_dirtied = statement_int_gauge(
"pg_stat_statements_shared_blks_dirtied_total",
"Shared blocks dirtied (modified)",
);
let shared_blks_written = statement_int_gauge(
"pg_stat_statements_shared_blks_written_total",
"Shared blocks written to disk",
);
let local_blks_hit = statement_int_gauge(
"pg_stat_statements_local_blks_hit_total",
"Local block cache hits (temp tables)",
);
let local_blks_read = statement_int_gauge(
"pg_stat_statements_local_blks_read_total",
"Local blocks read from disk (temp tables)",
);
let local_blks_dirtied = statement_int_gauge(
"pg_stat_statements_local_blks_dirtied_total",
"Local blocks dirtied (temp tables)",
);
let local_blks_written = statement_int_gauge(
"pg_stat_statements_local_blks_written_total",
"Local blocks written to disk (temp tables)",
);
let temp_blks_read = statement_int_gauge(
"pg_stat_statements_temp_blks_read_total",
"Temp file blocks read - query spilled to disk (work_mem too small!)",
);
let temp_blks_written = statement_int_gauge(
"pg_stat_statements_temp_blks_written_total",
"Temp file blocks written - query spilled to disk (work_mem too small!)",
);
let wal_bytes = statement_int_gauge(
"pg_stat_statements_wal_bytes_total",
"WAL bytes generated by this query",
);
let cache_hit_ratio = statement_gauge(
"pg_stat_statements_cache_hit_ratio",
"Cache hit ratio for this query (0.0-1.0, higher is better)",
);
Self {
total_exec_time,
mean_exec_time,
max_exec_time,
stddev_exec_time,
calls,
rows,
shared_blks_hit,
shared_blks_read,
shared_blks_dirtied,
shared_blks_written,
local_blks_hit,
local_blks_read,
local_blks_dirtied,
local_blks_written,
temp_blks_read,
temp_blks_written,
wal_bytes,
cache_hit_ratio,
top_n,
extension_state: Arc::new(Mutex::new(ExtensionState::Unknown)),
}
}
fn truncate_query(query: &str, max_len: usize) -> String {
let cleaned = query
.trim()
.lines()
.map(str::trim)
.collect::<Vec<_>>()
.join(" ");
if cleaned.len() <= max_len {
cleaned
} else {
let trunc_at = cleaned.floor_char_boundary(max_len);
format!("{}...", &cleaned[..trunc_at])
}
}
fn build_pg_statements_query(&self) -> String {
format!(
r"
SELECT
queryid::text,
d.datname,
COALESCE(r.rolname, '<unknown>') as usename,
LEFT(query, 80) as query_short,
calls::bigint,
(total_exec_time / {MS_TO_SEC})::double precision as total_exec_time_sec,
(mean_exec_time / {MS_TO_SEC})::double precision as mean_exec_time_sec,
(max_exec_time / {MS_TO_SEC})::double precision as max_exec_time_sec,
(stddev_exec_time / {MS_TO_SEC})::double precision as stddev_exec_time_sec,
rows::bigint,
shared_blks_hit::bigint,
shared_blks_read::bigint,
shared_blks_dirtied::bigint,
shared_blks_written::bigint,
local_blks_hit::bigint,
local_blks_read::bigint,
local_blks_dirtied::bigint,
local_blks_written::bigint,
temp_blks_read::bigint,
temp_blks_written::bigint,
COALESCE(wal_bytes, 0)::bigint as wal_bytes
FROM pg_stat_statements s
JOIN pg_database d ON d.oid = s.dbid
LEFT JOIN pg_roles r ON r.oid = s.userid
WHERE queryid IS NOT NULL
AND total_exec_time > 0
AND d.datname NOT IN ('{TEMPLATE0}', '{TEMPLATE1}')
AND BTRIM(REGEXP_REPLACE(query, '[[:space:]]+', ' ', 'g')) NOT LIKE 'SELECT queryid::text, d.datname,%'
ORDER BY total_exec_time DESC
LIMIT {}
",
self.top_n
)
}
fn extension_state_lock(&self) -> MutexGuard<'_, ExtensionState> {
match self.extension_state.lock() {
Ok(guard) => guard,
Err(poisoned) => poisoned.into_inner(),
}
}
fn cached_extension_availability(&self) -> Option<bool> {
match *self.extension_state_lock() {
ExtensionState::Installed => Some(true),
ExtensionState::Missing { last_checked }
if last_checked.elapsed() < MISSING_EXTENSION_RECHECK_AFTER =>
{
Some(false)
}
ExtensionState::Unknown | ExtensionState::Missing { .. } => None,
}
}
fn update_extension_state(&self, installed: bool) {
let mut state = self.extension_state_lock();
*state = if installed {
ExtensionState::Installed
} else {
ExtensionState::Missing {
last_checked: Instant::now(),
}
};
}
async fn pg_statements_available(&self, pool: &PgPool) -> Result<bool> {
if let Some(installed) = self.cached_extension_availability() {
return Ok(installed);
}
let installed = pg_statements_installed(pool).await?;
self.update_extension_state(installed);
if !installed {
warn!(
collector = "pg_statements",
"pg_stat_statements extension not installed - skipping collection"
);
}
Ok(installed)
}
fn record_statement_row(&self, row: &PgRow) {
let queryid: String = row
.try_get("queryid")
.unwrap_or_else(|_| "unknown".to_string());
let datname: String = row
.try_get("datname")
.unwrap_or_else(|_| "unknown".to_string());
let usename: String = row
.try_get("usename")
.unwrap_or_else(|_| "unknown".to_string());
let query_text: Option<String> = row.try_get("query_short").ok();
let query_short =
query_text.map_or_else(|| "<utility>".to_string(), |q| Self::truncate_query(&q, 80));
let labels = [
queryid.as_str(),
datname.as_str(),
usename.as_str(),
query_short.as_str(),
];
let total_time: f64 = row.try_get("total_exec_time_sec").unwrap_or(0.0);
let mean_time: f64 = row.try_get("mean_exec_time_sec").unwrap_or(0.0);
let max_time: f64 = row.try_get("max_exec_time_sec").unwrap_or(0.0);
let stddev_time: f64 = row.try_get("stddev_exec_time_sec").unwrap_or(0.0);
self.total_exec_time
.with_label_values(&labels)
.set(total_time);
self.mean_exec_time
.with_label_values(&labels)
.set(mean_time);
self.max_exec_time.with_label_values(&labels).set(max_time);
self.stddev_exec_time
.with_label_values(&labels)
.set(stddev_time);
let calls: i64 = row.try_get("calls").unwrap_or(0);
let rows_returned: i64 = row.try_get("rows").unwrap_or(0);
self.calls.with_label_values(&labels).set(calls);
self.rows.with_label_values(&labels).set(rows_returned);
let shared_hit: i64 = row.try_get("shared_blks_hit").unwrap_or(0);
let shared_read: i64 = row.try_get("shared_blks_read").unwrap_or(0);
let shared_dirtied: i64 = row.try_get("shared_blks_dirtied").unwrap_or(0);
let shared_written: i64 = row.try_get("shared_blks_written").unwrap_or(0);
self.shared_blks_hit
.with_label_values(&labels)
.set(shared_hit);
self.shared_blks_read
.with_label_values(&labels)
.set(shared_read);
self.shared_blks_dirtied
.with_label_values(&labels)
.set(shared_dirtied);
self.shared_blks_written
.with_label_values(&labels)
.set(shared_written);
let local_hit: i64 = row.try_get("local_blks_hit").unwrap_or(0);
let local_read: i64 = row.try_get("local_blks_read").unwrap_or(0);
let local_dirtied: i64 = row.try_get("local_blks_dirtied").unwrap_or(0);
let local_written: i64 = row.try_get("local_blks_written").unwrap_or(0);
self.local_blks_hit
.with_label_values(&labels)
.set(local_hit);
self.local_blks_read
.with_label_values(&labels)
.set(local_read);
self.local_blks_dirtied
.with_label_values(&labels)
.set(local_dirtied);
self.local_blks_written
.with_label_values(&labels)
.set(local_written);
let temp_read: i64 = row.try_get("temp_blks_read").unwrap_or(0);
let temp_written: i64 = row.try_get("temp_blks_written").unwrap_or(0);
self.temp_blks_read
.with_label_values(&labels)
.set(temp_read);
self.temp_blks_written
.with_label_values(&labels)
.set(temp_written);
let wal: i64 = row.try_get("wal_bytes").unwrap_or(0);
self.wal_bytes.with_label_values(&labels).set(wal);
let total_blocks = shared_hit + shared_read;
let hit_ratio = if total_blocks > 0 {
i64_to_f64(shared_hit) / i64_to_f64(total_blocks)
} else {
1.0
};
self.cache_hit_ratio
.with_label_values(&labels)
.set(hit_ratio);
}
}
const STATEMENT_LABELS: [&str; 4] = ["queryid", "datname", "usename", "query_short"];
#[allow(clippy::expect_used)]
fn statement_gauge(name: &str, help: &str) -> GaugeVec {
GaugeVec::new(
Opts::new(name, help).namespace("postgres"),
&STATEMENT_LABELS,
)
.expect("pg_stat_statements gauge metric")
}
#[allow(clippy::expect_used)]
fn statement_int_gauge(name: &str, help: &str) -> IntGaugeVec {
IntGaugeVec::new(
Opts::new(name, help).namespace("postgres"),
&STATEMENT_LABELS,
)
.expect("pg_stat_statements int metric")
}
async fn pg_statements_installed(pool: &PgPool) -> Result<bool> {
Ok(sqlx::query("SELECT 1 FROM pg_extension WHERE extname = 'pg_stat_statements'")
.fetch_optional(pool)
.await?
.is_some())
}
impl Collector for PgStatementsCollector {
fn name(&self) -> &'static str {
"pg_statements"
}
#[instrument(
skip(self, registry),
level = "info",
err,
fields(collector = "pg_statements")
)]
fn register_metrics(&self, registry: &Registry) -> Result<()> {
registry.register(Box::new(self.total_exec_time.clone()))?;
registry.register(Box::new(self.mean_exec_time.clone()))?;
registry.register(Box::new(self.max_exec_time.clone()))?;
registry.register(Box::new(self.stddev_exec_time.clone()))?;
registry.register(Box::new(self.calls.clone()))?;
registry.register(Box::new(self.rows.clone()))?;
registry.register(Box::new(self.shared_blks_hit.clone()))?;
registry.register(Box::new(self.shared_blks_read.clone()))?;
registry.register(Box::new(self.shared_blks_dirtied.clone()))?;
registry.register(Box::new(self.shared_blks_written.clone()))?;
registry.register(Box::new(self.local_blks_hit.clone()))?;
registry.register(Box::new(self.local_blks_read.clone()))?;
registry.register(Box::new(self.local_blks_dirtied.clone()))?;
registry.register(Box::new(self.local_blks_written.clone()))?;
registry.register(Box::new(self.temp_blks_read.clone()))?;
registry.register(Box::new(self.temp_blks_written.clone()))?;
registry.register(Box::new(self.wal_bytes.clone()))?;
registry.register(Box::new(self.cache_hit_ratio.clone()))?;
debug!(collector = "pg_statements", "registered metrics");
Ok(())
}
fn collect<'a>(&'a self, pool: &'a PgPool) -> BoxFuture<'a, Result<()>> {
Box::pin(
async move {
if !self.pg_statements_available(pool).await? {
return Ok(());
}
let query = self.build_pg_statements_query();
let rows: Vec<PgRow> = sqlx::query(&query).fetch_all(pool).await?;
let row_count = rows.len();
self.total_exec_time.reset();
self.mean_exec_time.reset();
self.max_exec_time.reset();
self.stddev_exec_time.reset();
self.calls.reset();
self.rows.reset();
self.shared_blks_hit.reset();
self.shared_blks_read.reset();
self.shared_blks_dirtied.reset();
self.shared_blks_written.reset();
self.local_blks_hit.reset();
self.local_blks_read.reset();
self.local_blks_dirtied.reset();
self.local_blks_written.reset();
self.temp_blks_read.reset();
self.temp_blks_written.reset();
self.wal_bytes.reset();
self.cache_hit_ratio.reset();
for row in rows {
self.record_statement_row(&row);
}
debug!(
collector = "pg_statements",
queries_tracked = row_count,
"collected pg_stat_statements metrics"
);
Ok(())
}
.instrument(info_span!("pg_statements.collect")),
)
}
fn enabled_by_default(&self) -> bool {
false }
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_pg_statements_collector_name() {
let collector = PgStatementsCollector::with_top_n(25);
assert_eq!(collector.name(), "pg_statements");
}
#[test]
fn test_pg_statements_collector_not_enabled_by_default() {
let collector = PgStatementsCollector::with_top_n(25);
assert!(!collector.enabled_by_default());
}
#[test]
fn test_truncate_query() {
let short = "SELECT * FROM users";
assert_eq!(PgStatementsCollector::truncate_query(short, 80), short);
let long = "SELECT * FROM users WHERE id = 1 AND name = 'test' AND email = 'test@example.com' AND created_at > NOW()";
let truncated = PgStatementsCollector::truncate_query(long, 80);
assert_eq!(truncated.len(), 83); assert!(truncated.ends_with("..."));
}
#[test]
fn test_truncate_query_multiline() {
let multiline = "SELECT *\n FROM users\n WHERE id = 1";
let result = PgStatementsCollector::truncate_query(multiline, 80);
assert_eq!(result, "SELECT * FROM users WHERE id = 1");
}
#[test]
fn test_truncate_query_utf8_boundary() {
let prefix = "a".repeat(79);
let query = format!("{prefix}ı");
let result = PgStatementsCollector::truncate_query(&query, 80);
assert_eq!(result, format!("{prefix}..."));
}
#[test]
fn test_build_pg_statements_query_uses_roles_left_join() {
let collector = PgStatementsCollector::with_top_n(25);
let query = collector.build_pg_statements_query();
assert!(query.contains("LEFT JOIN pg_roles r ON r.oid = s.userid"));
assert!(query.contains("COALESCE(r.rolname, '<unknown>') as usename"));
}
#[test]
fn test_build_pg_statements_query_excludes_self_query() {
let collector = PgStatementsCollector::with_top_n(25);
let query = collector.build_pg_statements_query();
assert!(query.contains(
"AND BTRIM(REGEXP_REPLACE(query, '[[:space:]]+', ' ', 'g')) NOT LIKE 'SELECT queryid::text, d.datname,%'"
));
}
#[test]
fn test_cached_extension_availability_uses_installed_cache() {
let collector = PgStatementsCollector::with_top_n(25);
collector.update_extension_state(true);
assert_eq!(collector.cached_extension_availability(), Some(true));
}
#[test]
fn test_cached_extension_availability_uses_missing_cache_before_ttl() {
let collector = PgStatementsCollector::with_top_n(25);
{
let mut state = collector.extension_state_lock();
*state = ExtensionState::Missing {
last_checked: Instant::now(),
};
}
assert_eq!(collector.cached_extension_availability(), Some(false));
}
#[test]
fn test_cached_extension_availability_rechecks_missing_after_ttl() {
let collector = PgStatementsCollector::with_top_n(25);
let expired_check = Instant::now()
.checked_sub(MISSING_EXTENSION_RECHECK_AFTER)
.unwrap_or_else(Instant::now);
{
let mut state = collector.extension_state_lock();
*state = ExtensionState::Missing {
last_checked: expired_check,
};
}
assert_eq!(collector.cached_extension_availability(), None);
}
}