use chrono::{DateTime, Utc};
use serde::Serialize;
use sqlx::Error;
use sqlx::postgres::PgPool;
use std::collections::BTreeMap;
const DEFAULT_GATEWAY_LOG_RETENTION_SECS: i64 = 1_209_600;
const LARGE_TABLE_PRESSURE_THRESHOLD: i64 = 10_000_000;
const LOG_LIKE_TABLE_MIN_ROWS: i64 = 100_000;
#[derive(Debug, Clone, Serialize, sqlx::FromRow)]
pub struct TableRowEstimateCatalogRow {
pub schema_name: String,
pub table_name: String,
pub estimated_rows: i64,
}
#[derive(Debug, Clone, Serialize)]
pub struct TableRowEstimateEntry {
pub schema_name: String,
pub table_name: String,
pub qualified_table_name: String,
pub estimated_rows: i64,
}
#[derive(Debug, Clone, Serialize)]
pub struct TableRowEstimateSchemaRollup {
pub schema_name: String,
pub table_count: i64,
pub estimated_rows: i64,
}
#[derive(Debug, Clone, Serialize)]
pub struct TableRowEstimateBucket {
pub label: String,
pub min_rows: i64,
pub max_rows: Option<i64>,
pub table_count: i64,
}
#[derive(Debug, Clone, Serialize)]
pub struct TableRowEstimateSummary {
pub table_count: i64,
pub nonzero_table_count: i64,
pub estimated_total_rows: i64,
pub max_estimated_rows: i64,
pub max_table_name: Option<String>,
pub max_qualified_table_name: Option<String>,
}
#[derive(Debug, Clone, Serialize)]
pub struct TableRowEstimateAdvisory {
pub code: String,
pub severity: String,
pub title: String,
pub message: String,
pub affected_tables: Vec<TableRowEstimateEntry>,
pub affected_estimated_rows: i64,
pub share_of_estimated_total_pct: Option<f64>,
pub config_key: Option<String>,
pub config_default_seconds: Option<i64>,
}
#[derive(Debug, Clone, Serialize)]
pub struct TableRowEstimateSnapshot {
pub source: String,
pub collected_at: DateTime<Utc>,
pub summary: TableRowEstimateSummary,
pub top_tables: Vec<TableRowEstimateEntry>,
pub schema_rollups: Vec<TableRowEstimateSchemaRollup>,
pub buckets: Vec<TableRowEstimateBucket>,
pub advisories: Vec<TableRowEstimateAdvisory>,
}
#[derive(Debug, Clone)]
pub struct TableRowEstimateQuery {
pub schema_name: Option<String>,
pub include_system_schemas: bool,
}
pub async fn collect_table_row_estimates(
target_pool: &PgPool,
query: &TableRowEstimateQuery,
) -> Result<Vec<TableRowEstimateCatalogRow>, Error> {
match (query.schema_name.as_deref(), query.include_system_schemas) {
(Some(schema_name), true) => {
sqlx::query_as(
r#"
SELECT
n.nspname AS schema_name,
c.relname AS table_name,
GREATEST(c.reltuples, 0)::bigint AS estimated_rows
FROM pg_class c
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE c.relkind = 'r'
AND n.nspname = $1
ORDER BY estimated_rows DESC, schema_name, table_name
"#,
)
.bind(schema_name)
.fetch_all(target_pool)
.await
}
(Some(schema_name), false) => {
sqlx::query_as(
r#"
SELECT
n.nspname AS schema_name,
c.relname AS table_name,
GREATEST(c.reltuples, 0)::bigint AS estimated_rows
FROM pg_class c
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE c.relkind = 'r'
AND n.nspname = $1
AND n.nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast')
AND n.nspname NOT LIKE 'pg_temp_%'
AND n.nspname NOT LIKE 'pg_toast_temp_%'
ORDER BY estimated_rows DESC, schema_name, table_name
"#,
)
.bind(schema_name)
.fetch_all(target_pool)
.await
}
(None, true) => {
sqlx::query_as(
r#"
SELECT
n.nspname AS schema_name,
c.relname AS table_name,
GREATEST(c.reltuples, 0)::bigint AS estimated_rows
FROM pg_class c
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE c.relkind = 'r'
ORDER BY estimated_rows DESC, schema_name, table_name
"#,
)
.fetch_all(target_pool)
.await
}
(None, false) => {
sqlx::query_as(
r#"
SELECT
n.nspname AS schema_name,
c.relname AS table_name,
GREATEST(c.reltuples, 0)::bigint AS estimated_rows
FROM pg_class c
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE c.relkind = 'r'
AND n.nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast')
AND n.nspname NOT LIKE 'pg_temp_%'
AND n.nspname NOT LIKE 'pg_toast_temp_%'
ORDER BY estimated_rows DESC, schema_name, table_name
"#,
)
.fetch_all(target_pool)
.await
}
}
}
pub fn build_table_row_estimate_snapshot(
rows: &[TableRowEstimateCatalogRow],
top_limit: usize,
) -> TableRowEstimateSnapshot {
let capped_limit: usize = top_limit.max(1);
let collected_at: DateTime<Utc> = Utc::now();
let source = "pg_class.reltuples".to_string();
let top_tables: Vec<TableRowEstimateEntry> = rows
.iter()
.take(capped_limit)
.map(table_row_estimate_entry)
.collect();
let mut estimated_total_rows: i64 = 0;
let mut nonzero_table_count: i64 = 0;
let mut max_estimated_rows: i64 = 0;
let mut max_table_name: Option<String> = None;
let mut max_qualified_table_name: Option<String> = None;
let mut schema_rollups: BTreeMap<String, (i64, i64)> = BTreeMap::new();
for row in rows {
estimated_total_rows = estimated_total_rows.saturating_add(row.estimated_rows.max(0));
if row.estimated_rows > 0 {
nonzero_table_count += 1;
}
if row.estimated_rows >= max_estimated_rows {
max_estimated_rows = row.estimated_rows;
max_table_name = Some(row.table_name.clone());
max_qualified_table_name = Some(format!("{}.{}", row.schema_name, row.table_name));
}
let entry = schema_rollups
.entry(row.schema_name.clone())
.or_insert((0, 0));
entry.0 += 1;
entry.1 = entry.1.saturating_add(row.estimated_rows.max(0));
}
let mut schema_rollups: Vec<TableRowEstimateSchemaRollup> = schema_rollups
.into_iter()
.map(
|(schema_name, (table_count, estimated_rows))| TableRowEstimateSchemaRollup {
schema_name,
table_count,
estimated_rows,
},
)
.collect();
schema_rollups.sort_by(|left, right| {
right
.estimated_rows
.cmp(&left.estimated_rows)
.then_with(|| left.schema_name.cmp(&right.schema_name))
});
let summary = TableRowEstimateSummary {
table_count: i64::try_from(rows.len()).unwrap_or(i64::MAX),
nonzero_table_count,
estimated_total_rows,
max_estimated_rows,
max_table_name,
max_qualified_table_name,
};
let advisories = build_table_row_estimate_advisories(rows, estimated_total_rows);
TableRowEstimateSnapshot {
source,
collected_at,
summary,
top_tables,
schema_rollups,
buckets: build_estimate_buckets(rows),
advisories,
}
}
fn build_estimate_buckets(rows: &[TableRowEstimateCatalogRow]) -> Vec<TableRowEstimateBucket> {
const BUCKETS: [(&str, i64, Option<i64>); 7] = [
("0", 0, Some(0)),
("1-999", 1, Some(999)),
("1K-9.9K", 1_000, Some(9_999)),
("10K-99.9K", 10_000, Some(99_999)),
("100K-999.9K", 100_000, Some(999_999)),
("1M-9.9M", 1_000_000, Some(9_999_999)),
("10M+", 10_000_000, None),
];
BUCKETS
.iter()
.map(|(label, min_rows, max_rows)| {
let table_count = rows
.iter()
.filter(|row| row_matches_bucket(row.estimated_rows, *min_rows, *max_rows))
.count();
TableRowEstimateBucket {
label: (*label).to_string(),
min_rows: *min_rows,
max_rows: *max_rows,
table_count: i64::try_from(table_count).unwrap_or(i64::MAX),
}
})
.collect()
}
fn table_row_estimate_entry(row: &TableRowEstimateCatalogRow) -> TableRowEstimateEntry {
TableRowEstimateEntry {
schema_name: row.schema_name.clone(),
table_name: row.table_name.clone(),
qualified_table_name: qualified_table_name(&row.schema_name, &row.table_name),
estimated_rows: row.estimated_rows,
}
}
fn qualified_table_name(schema_name: &str, table_name: &str) -> String {
format!("{schema_name}.{table_name}")
}
fn is_gateway_log_table(row: &TableRowEstimateCatalogRow) -> bool {
row.schema_name == "public"
&& matches!(
row.table_name.as_str(),
"gateway_request_log" | "gateway_operation_log"
)
}
fn is_log_like_table(row: &TableRowEstimateCatalogRow) -> bool {
row.table_name.ends_with("_log")
|| row.table_name.ends_with("_logs")
|| row.table_name.ends_with("_audit_log")
}
fn round_percentage(numerator: i64, denominator: i64) -> Option<f64> {
if numerator <= 0 || denominator <= 0 {
return None;
}
let pct = (numerator as f64 * 100.0) / denominator as f64;
Some((pct * 10.0).round() / 10.0)
}
fn build_table_row_estimate_advisories(
rows: &[TableRowEstimateCatalogRow],
estimated_total_rows: i64,
) -> Vec<TableRowEstimateAdvisory> {
let mut advisories: Vec<TableRowEstimateAdvisory> = Vec::new();
let gateway_log_tables: Vec<TableRowEstimateEntry> = rows
.iter()
.filter(|row| is_gateway_log_table(row) && row.estimated_rows > 0)
.take(5)
.map(table_row_estimate_entry)
.collect();
let gateway_log_estimated_rows: i64 = gateway_log_tables.iter().fold(0, |acc, entry| {
acc.saturating_add(entry.estimated_rows.max(0))
});
if !gateway_log_tables.is_empty() {
let share_of_total = round_percentage(gateway_log_estimated_rows, estimated_total_rows);
let severity = if gateway_log_estimated_rows >= LARGE_TABLE_PRESSURE_THRESHOLD
|| share_of_total.is_some_and(|share| share >= 50.0)
{
"critical"
} else {
"warn"
};
advisories.push(TableRowEstimateAdvisory {
code: "gateway-log-retention".to_string(),
severity: severity.to_string(),
title: "Gateway log tables are likely driving storage pressure".to_string(),
message: "Athena writes request and operation telemetry into these tables. If disk-full errors reference them, tighten `gateway.log_retention_secs` or prune old rows before inserts stall.".to_string(),
affected_tables: gateway_log_tables,
affected_estimated_rows: gateway_log_estimated_rows,
share_of_estimated_total_pct: share_of_total,
config_key: Some("gateway.log_retention_secs".to_string()),
config_default_seconds: Some(DEFAULT_GATEWAY_LOG_RETENTION_SECS),
});
}
let other_log_like_tables: Vec<TableRowEstimateEntry> = rows
.iter()
.filter(|row| {
!is_gateway_log_table(row)
&& is_log_like_table(row)
&& row.estimated_rows >= LOG_LIKE_TABLE_MIN_ROWS
})
.take(5)
.map(table_row_estimate_entry)
.collect();
let other_log_like_estimated_rows: i64 = other_log_like_tables.iter().fold(0, |acc, entry| {
acc.saturating_add(entry.estimated_rows.max(0))
});
if !other_log_like_tables.is_empty() {
advisories.push(TableRowEstimateAdvisory {
code: "log-like-table-growth".to_string(),
severity: "warn".to_string(),
title: "Other log-like tables are accumulating".to_string(),
message: "These tables look log-shaped by name and are already large enough to matter for backups, vacuum, and disk headroom. Review retention or archiving if they keep growing.".to_string(),
affected_tables: other_log_like_tables,
affected_estimated_rows: other_log_like_estimated_rows,
share_of_estimated_total_pct: round_percentage(
other_log_like_estimated_rows,
estimated_total_rows,
),
config_key: None,
config_default_seconds: None,
});
}
let very_large_tables: Vec<TableRowEstimateEntry> = rows
.iter()
.filter(|row| row.estimated_rows >= LARGE_TABLE_PRESSURE_THRESHOLD)
.take(5)
.map(table_row_estimate_entry)
.collect();
let very_large_estimated_rows: i64 = very_large_tables.iter().fold(0, |acc, entry| {
acc.saturating_add(entry.estimated_rows.max(0))
});
if !very_large_tables.is_empty() {
advisories.push(TableRowEstimateAdvisory {
code: "large-table-pressure".to_string(),
severity: "critical".to_string(),
title: "Very large tables need headroom".to_string(),
message: "Tables at 10M+ estimated rows are the first places to look when `No space left on device` starts blocking inserts, vacuum, or stats refreshes.".to_string(),
affected_tables: very_large_tables,
affected_estimated_rows: very_large_estimated_rows,
share_of_estimated_total_pct: round_percentage(
very_large_estimated_rows,
estimated_total_rows,
),
config_key: None,
config_default_seconds: None,
});
}
advisories
}
fn row_matches_bucket(value: i64, min_rows: i64, max_rows: Option<i64>) -> bool {
if value < min_rows {
return false;
}
match max_rows {
Some(max_rows) => value <= max_rows,
None => true,
}
}
#[cfg(test)]
mod tests {
use super::{
TableRowEstimateCatalogRow, build_table_row_estimate_snapshot, qualified_table_name,
row_matches_bucket,
};
#[test]
fn row_matches_bucket_handles_open_ended_ranges() {
assert!(row_matches_bucket(10_000_000, 10_000_000, None));
assert!(row_matches_bucket(25_000_000, 10_000_000, None));
assert!(!row_matches_bucket(9_999_999, 10_000_000, None));
}
#[test]
fn snapshot_rolls_up_top_tables_schema_totals_and_buckets() {
let rows = vec![
TableRowEstimateCatalogRow {
schema_name: "public".to_string(),
table_name: "orders".to_string(),
estimated_rows: 2_500_000,
},
TableRowEstimateCatalogRow {
schema_name: "public".to_string(),
table_name: "events".to_string(),
estimated_rows: 250_000,
},
TableRowEstimateCatalogRow {
schema_name: "analytics".to_string(),
table_name: "sessions".to_string(),
estimated_rows: 0,
},
];
let snapshot = build_table_row_estimate_snapshot(&rows, 2);
assert_eq!(snapshot.summary.table_count, 3);
assert_eq!(snapshot.summary.nonzero_table_count, 2);
assert_eq!(snapshot.summary.estimated_total_rows, 2_750_000);
assert_eq!(
snapshot.summary.max_qualified_table_name.as_deref(),
Some("public.orders")
);
assert_eq!(snapshot.top_tables.len(), 2);
assert_eq!(snapshot.top_tables[0].qualified_table_name, "public.orders");
assert_eq!(snapshot.schema_rollups.len(), 2);
assert_eq!(snapshot.schema_rollups[0].schema_name, "public");
assert_eq!(snapshot.schema_rollups[0].estimated_rows, 2_750_000);
assert_eq!(
snapshot
.buckets
.iter()
.find(|bucket| bucket.label == "0")
.map(|bucket| bucket.table_count),
Some(1)
);
assert_eq!(
snapshot
.buckets
.iter()
.find(|bucket| bucket.label == "1M-9.9M")
.map(|bucket| bucket.table_count),
Some(1)
);
assert!(snapshot.advisories.is_empty());
}
#[test]
fn snapshot_marks_gateway_logs_as_storage_pressure_advisory() {
let rows = vec![
TableRowEstimateCatalogRow {
schema_name: "public".to_string(),
table_name: "gateway_request_log".to_string(),
estimated_rows: 12_500_000,
},
TableRowEstimateCatalogRow {
schema_name: "public".to_string(),
table_name: "gateway_operation_log".to_string(),
estimated_rows: 3_500_000,
},
TableRowEstimateCatalogRow {
schema_name: "public".to_string(),
table_name: "orders".to_string(),
estimated_rows: 250_000,
},
];
let snapshot = build_table_row_estimate_snapshot(&rows, 3);
let advisory = snapshot
.advisories
.iter()
.find(|entry| entry.code == "gateway-log-retention")
.expect("gateway log advisory");
assert_eq!(advisory.severity, "critical");
assert_eq!(
advisory.config_key.as_deref(),
Some("gateway.log_retention_secs")
);
assert_eq!(advisory.config_default_seconds, Some(1_209_600));
assert_eq!(advisory.affected_tables.len(), 2);
assert_eq!(
advisory.affected_tables[0].qualified_table_name,
qualified_table_name("public", "gateway_request_log")
);
}
#[test]
fn snapshot_marks_other_log_like_tables() {
let rows = vec![
TableRowEstimateCatalogRow {
schema_name: "public".to_string(),
table_name: "database_audit_log".to_string(),
estimated_rows: 500_000,
},
TableRowEstimateCatalogRow {
schema_name: "public".to_string(),
table_name: "orders".to_string(),
estimated_rows: 250_000,
},
];
let snapshot = build_table_row_estimate_snapshot(&rows, 2);
let advisory = snapshot
.advisories
.iter()
.find(|entry| entry.code == "log-like-table-growth")
.expect("log-like table advisory");
assert_eq!(advisory.severity, "warn");
assert_eq!(advisory.affected_tables.len(), 1);
assert_eq!(
advisory.affected_tables[0].qualified_table_name,
qualified_table_name("public", "database_audit_log")
);
}
}