use fraiseql_error::{FraiseQLError, Result};
use tokio_postgres::Row;
use super::PostgresAdapter;
use crate::types::QueryStatEntry;
impl PostgresAdapter {
async fn has_pg_stat_statements(&self) -> Result<bool> {
let client = self.acquire_connection_with_retry().await?;
match client.query("SELECT 1 FROM pg_stat_statements LIMIT 0", &[]).await {
Ok(_) => Ok(true),
Err(_) => Ok(false),
}
}
fn map_pg_stat_row(row: &Row) -> Result<QueryStatEntry> {
let shared_blks_hit: i64 = row.try_get("shared_blks_hit").unwrap_or(0);
let shared_blks_read: i64 = row.try_get("shared_blks_read").unwrap_or(0);
let cache_hit_ratio = if shared_blks_hit + shared_blks_read > 0 {
#[allow(clippy::cast_precision_loss)]
Some(shared_blks_hit as f64 / (shared_blks_hit + shared_blks_read) as f64)
} else {
None
};
Ok(QueryStatEntry {
query_id: row.try_get::<_, String>("query_id").unwrap_or_default(),
query_text: row.try_get::<_, String>("query").unwrap_or_default(),
calls: row.try_get::<_, i64>("calls").unwrap_or(0).unsigned_abs(),
total_exec_time_ms: row.try_get("total_exec_time").unwrap_or(0.0),
mean_exec_time_ms: row.try_get("mean_exec_time").unwrap_or(0.0),
min_exec_time_ms: row.try_get("min_exec_time").unwrap_or(0.0),
max_exec_time_ms: row.try_get("max_exec_time").unwrap_or(0.0),
rows_returned: row.try_get::<_, i64>("rows").unwrap_or(0).unsigned_abs(),
cache_hit_ratio,
database_specific: serde_json::json!({
"shared_blks_hit": shared_blks_hit,
"shared_blks_read": shared_blks_read,
}),
})
}
pub(crate) async fn pg_query_stats(&self, limit: u32) -> Result<Vec<QueryStatEntry>> {
if !self.has_pg_stat_statements().await? {
return Ok(vec![]);
}
let client = self.acquire_connection_with_retry().await?;
let rows = client
.query(
"SELECT \
queryid::text AS query_id, \
query, \
calls, \
total_exec_time, \
mean_exec_time, \
min_exec_time, \
max_exec_time, \
rows, \
shared_blks_hit, \
shared_blks_read \
FROM pg_stat_statements \
ORDER BY total_exec_time DESC \
LIMIT $1",
&[&i64::from(limit)],
)
.await
.map_err(|e| FraiseQLError::Database {
message: format!("Failed to query pg_stat_statements: {e}"),
sql_state: e.code().map(|c| c.code().to_string()),
})?;
rows.iter().map(Self::map_pg_stat_row).collect()
}
pub(crate) async fn pg_query_stats_by_id(&self, id: &str) -> Result<Option<QueryStatEntry>> {
if !self.has_pg_stat_statements().await? {
return Ok(None);
}
let client = self.acquire_connection_with_retry().await?;
let rows = client
.query(
"SELECT \
queryid::text AS query_id, \
query, \
calls, \
total_exec_time, \
mean_exec_time, \
min_exec_time, \
max_exec_time, \
rows, \
shared_blks_hit, \
shared_blks_read \
FROM pg_stat_statements \
WHERE queryid::text = $1",
&[&id],
)
.await
.map_err(|e| FraiseQLError::Database {
message: format!("Failed to query pg_stat_statements by id: {e}"),
sql_state: e.code().map(|c| c.code().to_string()),
})?;
rows.first().map(Self::map_pg_stat_row).transpose()
}
pub(crate) async fn pg_reset_query_stats(&self) -> Result<()> {
if !self.has_pg_stat_statements().await? {
return Ok(());
}
let client = self.acquire_connection_with_retry().await?;
client.execute("SELECT pg_stat_statements_reset()", &[]).await.map_err(|e| {
FraiseQLError::Database {
message: format!("Failed to reset pg_stat_statements: {e}"),
sql_state: e.code().map(|c| c.code().to_string()),
}
})?;
Ok(())
}
}