use std::io::Write;
use serde::Serialize;
use tokio_postgres::error::SqlState;
use djogi::__bypass::RawAccessExt as _;
use djogi::DjogiError;
use djogi::config::DjogiConfig;
use djogi::context::DjogiContext;
use djogi::pg::pool::DjogiPool;
#[derive(Debug, Clone, Serialize)]
pub struct TableHealth {
pub table_name: String,
pub n_live_tup: i64,
pub n_dead_tup: i64,
pub last_analyze: Option<time::OffsetDateTime>,
pub partition_count: i32,
}
#[derive(Debug, Clone, PartialEq, Serialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum Recommendation {
VacuumNeeded {
dead_tup_ratio: f64,
},
PartitionRecommended {
reason: String,
},
PartitionCountIncrease {
current: i32,
suggested: i32,
},
Healthy,
}
pub fn recommend(
health: &TableHealth,
threshold_vacuum: f64,
threshold_partition_rows: i64,
) -> Recommendation {
let total_tup = health.n_live_tup.saturating_add(health.n_dead_tup);
if total_tup > 0 {
let ratio = health.n_dead_tup as f64 / total_tup as f64;
if ratio > threshold_vacuum {
return Recommendation::VacuumNeeded {
dead_tup_ratio: ratio,
};
}
}
if health.partition_count == 0 && health.n_live_tup > threshold_partition_rows {
return Recommendation::PartitionRecommended {
reason: format!(
"table has {} live rows but is not partitioned (threshold: {})",
health.n_live_tup, threshold_partition_rows
),
};
}
if health.partition_count > 0 {
let avg_per_partition = health.n_live_tup / health.partition_count as i64;
if avg_per_partition > threshold_partition_rows {
return Recommendation::PartitionCountIncrease {
current: health.partition_count,
suggested: health.partition_count.saturating_mul(2),
};
}
}
Recommendation::Healthy
}
#[derive(Debug)]
pub enum AnalyzeError {
Config(String),
Pool {
url: String,
message: String,
},
Db(String),
Io(std::io::Error),
Json(serde_json::Error),
}
impl std::fmt::Display for AnalyzeError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
AnalyzeError::Config(message) => write!(
f,
"config load: {message}. \
Verify the Djogi.toml workspace path and the [database] section.",
),
AnalyzeError::Pool { url, message } => write!(
f,
"application DB at `{url}` unreachable: {message}. \
Verify Djogi.toml::database.url is reachable and the \
credentials grant CONNECT.",
),
AnalyzeError::Db(message) => write!(
f,
"live-DB query: {message}. \
Verify the app DB is reachable and the role has SELECT \
privilege on pg_stat_user_tables (and on partman.* if \
pg_partman is installed).",
),
AnalyzeError::Io(e) => write!(
f,
"writing analyze output: {e}. \
Check stdout/stderr permissions and the workspace path.",
),
AnalyzeError::Json(e) => write!(
f,
"encoding analyze output as JSON: {e}. \
This is an internal bug — please file an issue with the \
input that triggered it.",
),
}
}
}
impl std::error::Error for AnalyzeError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
AnalyzeError::Io(e) => Some(e),
AnalyzeError::Json(e) => Some(e),
AnalyzeError::Config(_) | AnalyzeError::Pool { .. } | AnalyzeError::Db(_) => None,
}
}
}
impl From<std::io::Error> for AnalyzeError {
fn from(e: std::io::Error) -> Self {
AnalyzeError::Io(e)
}
}
impl From<serde_json::Error> for AnalyzeError {
fn from(e: serde_json::Error) -> Self {
AnalyzeError::Json(e)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AnalyzeFormat {
Human,
Json,
}
pub async fn fetch_table_health(pool: &DjogiPool) -> Result<Vec<TableHealth>, AnalyzeError> {
let mut ctx = DjogiContext::from_pool(pool.clone());
let partman_present = !ctx
.raw_rows(
"SELECT 1 \
FROM pg_namespace n \
JOIN pg_proc p ON p.pronamespace = n.oid \
WHERE n.nspname = 'partman' AND p.proname = 'show_partitions'",
&[],
)
.await
.map_err(djogi_err_to_analyze)?
.is_empty();
let stats_sql = "SELECT \
schemaname || '.' || relname AS table_name, \
n_live_tup, \
n_dead_tup, \
last_analyze \
FROM pg_stat_user_tables \
ORDER BY schemaname, relname";
let stats_rows = ctx
.raw_rows(stats_sql, &[])
.await
.map_err(djogi_err_to_analyze)?;
let mut out = Vec::with_capacity(stats_rows.len());
for row in stats_rows {
let table_name: String = row
.try_get(0)
.map_err(|e| AnalyzeError::Db(format!("decoding table_name: {e}")))?;
let n_live_tup: i64 = row
.try_get(1)
.map_err(|e| AnalyzeError::Db(format!("decoding n_live_tup: {e}")))?;
let n_dead_tup: i64 = row
.try_get(2)
.map_err(|e| AnalyzeError::Db(format!("decoding n_dead_tup: {e}")))?;
let last_analyze: Option<time::OffsetDateTime> = row
.try_get(3)
.map_err(|e| AnalyzeError::Db(format!("decoding last_analyze: {e}")))?;
let partition_count = if partman_present {
match query_partition_count(&mut ctx, &table_name).await {
Ok(count) => count,
Err(PartmanError::Absent) => 0,
Err(PartmanError::Other(message)) => return Err(AnalyzeError::Db(message)),
}
} else {
0
};
out.push(TableHealth {
table_name,
n_live_tup,
n_dead_tup,
last_analyze,
partition_count,
});
}
Ok(out)
}
enum PartmanError {
Absent,
Other(String),
}
async fn query_partition_count(
ctx: &mut DjogiContext,
table_name: &str,
) -> Result<i32, PartmanError> {
let sql = "SELECT count(*)::int FROM partman.show_partitions($1)";
match ctx.raw_rows(sql, &[&table_name]).await {
Ok(rows) => {
if let Some(row) = rows.first() {
let count: i32 = row
.try_get(0)
.map_err(|e| PartmanError::Other(format!("decoding partition count: {e}")))?;
Ok(count)
} else {
Ok(0)
}
}
Err(DjogiError::Db(db)) => {
if let Some(code) = db.code()
&& is_partman_absent_code(code)
{
Err(PartmanError::Absent)
} else {
Err(PartmanError::Other(db.to_string()))
}
}
Err(other) => Err(PartmanError::Other(other.to_string())),
}
}
fn is_partman_absent_code(code: &SqlState) -> bool {
*code == SqlState::UNDEFINED_FUNCTION
|| *code == SqlState::UNDEFINED_TABLE
|| *code == SqlState::INVALID_SCHEMA_NAME
}
fn djogi_err_to_analyze(e: DjogiError) -> AnalyzeError {
AnalyzeError::Db(e.to_string())
}
pub async fn run(
workspace: Option<std::path::PathBuf>,
format: AnalyzeFormat,
threshold_vacuum: f64,
threshold_partition_rows: i64,
) -> Result<(), AnalyzeError> {
let workspace = workspace.unwrap_or_else(|| {
std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from("."))
});
let config = DjogiConfig::load_from_workspace(&workspace)
.map_err(|e| AnalyzeError::Config(e.to_string()))?;
let url = config.database.url.clone();
let pool = DjogiPool::connect(&url)
.await
.map_err(|e| AnalyzeError::Pool {
url: url.clone(),
message: e.to_string(),
})?;
djogi::pg::preflight::check_postgres_version(&pool)
.await
.map_err(|e| AnalyzeError::Pool {
url: url.clone(),
message: format!("support boundary: {e}"),
})?;
let mut health = fetch_table_health(&pool).await?;
health.sort_by(|a, b| a.table_name.cmp(&b.table_name));
let report: Vec<(TableHealth, Recommendation)> = health
.into_iter()
.map(|h| {
let rec = recommend(&h, threshold_vacuum, threshold_partition_rows);
(h, rec)
})
.collect();
let stdout = std::io::stdout();
let mut handle = stdout.lock();
match format {
AnalyzeFormat::Human => render_human(&report, &mut handle)?,
AnalyzeFormat::Json => render_json(&report, &mut handle)?,
}
handle.flush()?;
Ok(())
}
fn render_human<W: Write>(
report: &[(TableHealth, Recommendation)],
out: &mut W,
) -> Result<(), AnalyzeError> {
let h_table = "TABLE";
let h_live = "LIVE";
let h_dead = "DEAD";
let h_parts = "PARTITIONS";
let h_rec = "RECOMMENDATION";
writeln!(
out,
"{h_table:<48} {h_live:>14} {h_dead:>14} {h_parts:>10} {h_rec}",
)?;
for (h, r) in report {
writeln!(
out,
"{:<48} {:>14} {:>14} {:>10} {}",
h.table_name,
h.n_live_tup,
h.n_dead_tup,
h.partition_count,
recommendation_human(r),
)?;
}
Ok(())
}
fn recommendation_human(r: &Recommendation) -> String {
match r {
Recommendation::VacuumNeeded { dead_tup_ratio } => {
format!("vacuum (dead_tup_ratio={dead_tup_ratio:.4})")
}
Recommendation::PartitionRecommended { reason } => {
format!("partition ({reason})")
}
Recommendation::PartitionCountIncrease { current, suggested } => {
format!("parts++ (current={current}, suggested={suggested})")
}
Recommendation::Healthy => "healthy".to_string(),
}
}
fn render_json<W: Write>(
report: &[(TableHealth, Recommendation)],
out: &mut W,
) -> Result<(), AnalyzeError> {
#[derive(Serialize)]
struct Row<'a> {
table_name: &'a str,
n_live_tup: i64,
n_dead_tup: i64,
last_analyze: Option<time::OffsetDateTime>,
partition_count: i32,
recommendation: &'a Recommendation,
}
let rows: Vec<Row<'_>> = report
.iter()
.map(|(h, r)| Row {
table_name: &h.table_name,
n_live_tup: h.n_live_tup,
n_dead_tup: h.n_dead_tup,
last_analyze: h.last_analyze,
partition_count: h.partition_count,
recommendation: r,
})
.collect();
serde_json::to_writer_pretty(&mut *out, &rows)?;
writeln!(out)?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
fn health(n_live_tup: i64, n_dead_tup: i64, partition_count: i32) -> TableHealth {
TableHealth {
table_name: "test_table".to_string(),
n_live_tup,
n_dead_tup,
last_analyze: None,
partition_count,
}
}
#[test]
fn recommend_healthy_when_below_all_thresholds() {
let h = health(1_000, 0, 0);
assert_eq!(recommend(&h, 0.2, 10_000_000), Recommendation::Healthy);
let h = health(100_000, 0, 4);
assert_eq!(recommend(&h, 0.2, 10_000_000), Recommendation::Healthy);
let h = health(0, 0, 0);
assert_eq!(recommend(&h, 0.2, 10_000_000), Recommendation::Healthy);
}
#[test]
fn recommend_vacuum_when_dead_tup_ratio_high() {
let h = health(79, 21, 0);
match recommend(&h, 0.2, 10_000_000) {
Recommendation::VacuumNeeded { dead_tup_ratio } => {
assert!(
dead_tup_ratio > 0.20 && dead_tup_ratio < 0.22,
"expected ratio near 0.21, got {dead_tup_ratio}"
);
}
other => panic!("expected VacuumNeeded, got {other:?}"),
}
let h = health(81, 19, 0);
assert_eq!(recommend(&h, 0.2, 10_000_000), Recommendation::Healthy);
let h = health(80, 20, 0);
assert_eq!(recommend(&h, 0.2, 10_000_000), Recommendation::Healthy);
let h = health(50, 50, 0);
match recommend(&h, 0.2, 10_000_000) {
Recommendation::VacuumNeeded { dead_tup_ratio } => {
assert!((dead_tup_ratio - 0.5).abs() < 1e-9);
}
other => panic!("expected VacuumNeeded, got {other:?}"),
}
}
#[test]
fn recommend_partition_when_unpartitioned_and_large() {
let h = health(10_000_001, 0, 0);
match recommend(&h, 0.2, 10_000_000) {
Recommendation::PartitionRecommended { reason } => {
assert!(reason.contains("10000001"), "reason: {reason}");
assert!(reason.contains("not partitioned"), "reason: {reason}");
assert!(reason.contains("threshold: 10000000"), "reason: {reason}");
}
other => panic!("expected PartitionRecommended, got {other:?}"),
}
let h = health(10_000_000, 0, 0);
assert_eq!(recommend(&h, 0.2, 10_000_000), Recommendation::Healthy);
let h = health(20_000_000, 0, 100);
assert_eq!(recommend(&h, 0.2, 10_000_000), Recommendation::Healthy);
}
#[test]
fn recommend_partition_count_increase_when_partitions_undersized() {
let h = health(100_000_000, 0, 4);
assert_eq!(
recommend(&h, 0.2, 10_000_000),
Recommendation::PartitionCountIncrease {
current: 4,
suggested: 8,
}
);
let h = health(i64::MAX / 2, 0, i32::MAX);
match recommend(&h, 0.2, 10_000_000) {
Recommendation::PartitionCountIncrease { current, suggested } => {
assert_eq!(current, i32::MAX);
assert_eq!(suggested, i32::MAX); }
other => panic!("expected PartitionCountIncrease, got {other:?}"),
}
}
#[test]
fn recommend_is_deterministic() {
let h = health(50_000_000, 0, 3);
let baseline = recommend(&h, 0.2, 10_000_000);
for i in 0..100 {
let result = recommend(&h, 0.2, 10_000_000);
assert_eq!(
result, baseline,
"iteration {i} diverged from baseline {baseline:?}"
);
}
let h = health(70, 30, 0);
let baseline = recommend(&h, 0.2, 10_000_000);
for i in 0..100 {
assert_eq!(
recommend(&h, 0.2, 10_000_000),
baseline,
"vacuum iteration {i} diverged"
);
}
}
#[test]
fn recommend_vacuum_dominates_partition() {
let h = health(100_000_000, 50_000_000, 0);
match recommend(&h, 0.2, 10_000_000) {
Recommendation::VacuumNeeded { dead_tup_ratio } => {
assert!((dead_tup_ratio - (50.0 / 150.0)).abs() < 1e-9);
}
other => panic!("expected VacuumNeeded (precedence), got {other:?}"),
}
}
#[test]
fn recommend_partition_dominates_count_increase() {
let h = health(100_000_000, 0, 4); match recommend(&h, 0.2, 10_000_000) {
Recommendation::PartitionCountIncrease { current, suggested } => {
assert_eq!(current, 4);
assert_eq!(suggested, 8);
}
other => panic!("expected PartitionCountIncrease, got {other:?}"),
}
let h = health(100_000_000, 0, 0);
assert!(matches!(
recommend(&h, 0.2, 10_000_000),
Recommendation::PartitionRecommended { .. }
));
}
#[test]
fn recommend_handles_n_tup_addition_overflow() {
let h = TableHealth {
table_name: "boom".to_string(),
n_live_tup: i64::MAX,
n_dead_tup: i64::MAX,
last_analyze: None,
partition_count: 0,
};
let result = recommend(&h, 0.2, 10_000_000);
match result {
Recommendation::VacuumNeeded { dead_tup_ratio } => {
assert!(
(dead_tup_ratio - 1.0).abs() < 1e-9,
"expected ratio 1.0, got {dead_tup_ratio}"
);
}
other => panic!("expected VacuumNeeded, got {other:?}"),
}
}
fn fixture_report() -> Vec<(TableHealth, Recommendation)> {
vec![
(health(1_000, 0, 0), Recommendation::Healthy),
(
TableHealth {
table_name: "public.events".to_string(),
n_live_tup: 100_000_000,
n_dead_tup: 0,
last_analyze: None,
partition_count: 0,
},
Recommendation::PartitionRecommended {
reason:
"table has 100000000 live rows but is not partitioned (threshold: 10000000)"
.to_string(),
},
),
(
TableHealth {
table_name: "public.orders".to_string(),
n_live_tup: 100_000_000,
n_dead_tup: 0,
last_analyze: None,
partition_count: 4,
},
Recommendation::PartitionCountIncrease {
current: 4,
suggested: 8,
},
),
(
TableHealth {
table_name: "public.users".to_string(),
n_live_tup: 50,
n_dead_tup: 50,
last_analyze: None,
partition_count: 0,
},
Recommendation::VacuumNeeded {
dead_tup_ratio: 0.5,
},
),
]
}
#[test]
fn render_human_lists_every_table_in_input_order() {
let report = fixture_report();
let mut buf = Vec::new();
render_human(&report, &mut buf).expect("render");
let s = String::from_utf8(buf).expect("utf8");
assert!(s.contains("TABLE"), "missing header: {s}");
assert!(s.contains("RECOMMENDATION"), "missing header: {s}");
for table in [
"test_table",
"public.events",
"public.orders",
"public.users",
] {
assert_eq!(
s.matches(table).count(),
1,
"table `{table}` should appear once: {s}"
);
}
assert!(s.contains("healthy"), "healthy tag missing: {s}");
assert!(
s.contains("vacuum (dead_tup_ratio="),
"vacuum tag missing: {s}"
);
assert!(
s.contains("partition (table has 100000000"),
"partition tag missing: {s}"
);
assert!(
s.contains("parts++ (current=4, suggested=8)"),
"parts++ tag missing: {s}"
);
let test_idx = s.find("test_table").unwrap();
let events_idx = s.find("public.events").unwrap();
let orders_idx = s.find("public.orders").unwrap();
let users_idx = s.find("public.users").unwrap();
assert!(
test_idx < events_idx && events_idx < orders_idx && orders_idx < users_idx,
"input order not preserved: {s}"
);
}
#[test]
fn render_human_is_byte_stable() {
let report = fixture_report();
let mut a = Vec::new();
let mut b = Vec::new();
render_human(&report, &mut a).unwrap();
render_human(&report, &mut b).unwrap();
assert_eq!(a, b, "render_human is not byte-stable");
}
#[test]
fn render_json_parses_and_field_order_is_stable() {
let report = fixture_report();
let mut buf = Vec::new();
render_json(&report, &mut buf).expect("render");
let parsed: serde_json::Value =
serde_json::from_slice(&buf).expect("output must be valid JSON");
let array = parsed.as_array().expect("output must be a JSON array");
assert_eq!(array.len(), 4, "expected 4 fixture rows: {parsed}");
let kinds: Vec<&str> = array
.iter()
.filter_map(|row| row.get("recommendation"))
.filter_map(|rec| rec.get("kind"))
.filter_map(|k| k.as_str())
.collect();
assert!(kinds.contains(&"healthy"), "kinds: {kinds:?}");
assert!(kinds.contains(&"vacuum_needed"), "kinds: {kinds:?}");
assert!(kinds.contains(&"partition_recommended"), "kinds: {kinds:?}");
assert!(
kinds.contains(&"partition_count_increase"),
"kinds: {kinds:?}"
);
let first = array.first().expect("non-empty");
let obj = first.as_object().expect("row must be an object");
let keys: Vec<&str> = obj.keys().map(String::as_str).collect();
assert_eq!(
keys,
vec![
"table_name",
"n_live_tup",
"n_dead_tup",
"last_analyze",
"partition_count",
"recommendation",
],
"field order must match Row struct declaration: {keys:?}"
);
let mut buf2 = Vec::new();
render_json(&report, &mut buf2).unwrap();
assert_eq!(buf, buf2, "render_json is not byte-stable");
}
#[test]
fn render_json_handles_empty_input() {
let report: Vec<(TableHealth, Recommendation)> = Vec::new();
let mut buf = Vec::new();
render_json(&report, &mut buf).expect("render");
let s = String::from_utf8(buf).expect("utf8");
assert_eq!(s, "[]\n", "empty input should produce `[]\\n`, got: {s:?}");
}
#[test]
fn render_human_handles_empty_input() {
let report: Vec<(TableHealth, Recommendation)> = Vec::new();
let mut buf = Vec::new();
render_human(&report, &mut buf).expect("render");
let s = String::from_utf8(buf).expect("utf8");
assert_eq!(
s.lines().count(),
1,
"expected just a header line, got: {s:?}"
);
assert!(s.contains("TABLE"));
}
#[test]
fn is_partman_absent_code_recognises_three_states() {
assert!(is_partman_absent_code(&SqlState::UNDEFINED_FUNCTION));
assert!(is_partman_absent_code(&SqlState::UNDEFINED_TABLE));
assert!(is_partman_absent_code(&SqlState::INVALID_SCHEMA_NAME));
assert!(!is_partman_absent_code(&SqlState::SYNTAX_ERROR));
assert!(!is_partman_absent_code(&SqlState::INSUFFICIENT_PRIVILEGE));
assert!(!is_partman_absent_code(&SqlState::CONNECTION_FAILURE));
}
#[test]
fn analyze_error_display_is_operator_actionable() {
let cfg = AnalyzeError::Config("bad toml".to_string());
let s = format!("{cfg}");
assert!(s.contains("config load"), "config display: {s}");
assert!(s.contains("bad toml"), "config display: {s}");
assert!(
s.contains("Verify"),
"config display must carry a remediation keyword: {s}"
);
assert!(
s.contains("Djogi.toml"),
"config display must point at Djogi.toml: {s}"
);
let pool = AnalyzeError::Pool {
url: "postgres://localhost/x".to_string(),
message: "refused".to_string(),
};
let s = format!("{pool}");
assert!(s.contains("postgres://localhost/x"), "pool display: {s}");
assert!(s.contains("refused"), "pool display: {s}");
assert!(
s.contains("Djogi.toml::database.url"),
"pool display must point at config: {s}"
);
assert!(
s.contains("Verify"),
"pool display must carry a remediation keyword: {s}"
);
let db = AnalyzeError::Db("relation \"foo\" does not exist".to_string());
let s = format!("{db}");
assert!(s.contains("live-DB query"), "db display: {s}");
assert!(s.contains("relation \"foo\""), "db display: {s}");
assert!(
s.contains("Verify"),
"db display must carry a remediation keyword: {s}"
);
assert!(
s.contains("pg_stat_user_tables"),
"db display must mention the catalogue we query: {s}"
);
let io = AnalyzeError::Io(std::io::Error::other("broken pipe"));
let s = format!("{io}");
assert!(s.contains("writing analyze output"), "io display: {s}");
assert!(s.contains("broken pipe"), "io display: {s}");
assert!(
s.contains("Check"),
"io display must carry a remediation keyword: {s}"
);
let json_err: serde_json::Error =
serde_json::from_str::<serde_json::Value>("not json").unwrap_err();
let json = AnalyzeError::Json(json_err);
let s = format!("{json}");
assert!(s.contains("encoding analyze output"), "json display: {s}");
assert!(
s.contains("file an issue"),
"json display must point at the issue tracker: {s}"
);
}
}