Skip to main content

waypoint_core/
advisor.rs

1//! Schema advisor: proactive suggestions for schema improvements.
2//!
3//! Analyzes the live database schema and produces actionable advisories
4//! with generated fix SQL.
5
6use serde::Serialize;
7use tokio_postgres::Client;
8
9use crate::db::quote_ident;
10use crate::error::Result;
11
12/// Configuration for the schema advisor.
13#[derive(Debug, Clone, Default)]
14pub struct AdvisorConfig {
15    /// Whether to run the advisor after migrations.
16    pub run_after_migrate: bool,
17    /// List of rule IDs to disable (e.g., ["A003", "A006"]).
18    pub disabled_rules: Vec<String>,
19}
20
21/// Severity of an advisory.
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
23#[serde(rename_all = "lowercase")]
24pub enum AdvisorySeverity {
25    Info,
26    Suggestion,
27    Warning,
28}
29
30impl std::fmt::Display for AdvisorySeverity {
31    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32        match self {
33            Self::Info => write!(f, "info"),
34            Self::Suggestion => write!(f, "suggestion"),
35            Self::Warning => write!(f, "warning"),
36        }
37    }
38}
39
40/// A single advisory finding.
41#[derive(Debug, Clone, Serialize)]
42pub struct Advisory {
43    /// Rule ID (e.g., "A001").
44    pub rule_id: String,
45    /// Category of the advisory.
46    pub category: String,
47    /// Severity level.
48    pub severity: AdvisorySeverity,
49    /// Affected database object (e.g., "users.email", "idx_name").
50    pub object: String,
51    /// Human-readable explanation of the issue.
52    pub explanation: String,
53    /// Generated SQL to fix the issue.
54    pub fix_sql: Option<String>,
55}
56
57/// Report from the schema advisor.
58#[derive(Debug, Clone, Serialize)]
59pub struct AdvisorReport {
60    /// Schema that was analyzed.
61    pub schema: String,
62    /// All advisory findings.
63    pub advisories: Vec<Advisory>,
64    /// Count of warnings.
65    pub warning_count: usize,
66    /// Count of suggestions.
67    pub suggestion_count: usize,
68    /// Count of info items.
69    pub info_count: usize,
70}
71
72/// Run all advisory rules against the database schema.
73pub async fn analyze(
74    client: &Client,
75    schema: &str,
76    config: &AdvisorConfig,
77) -> Result<AdvisorReport> {
78    let mut advisories = Vec::new();
79
80    if !config.disabled_rules.contains(&"A001".to_string()) {
81        advisories.extend(check_a001_fk_without_index(client, schema).await?);
82    }
83    if !config.disabled_rules.contains(&"A002".to_string()) {
84        advisories.extend(check_a002_unused_indexes(client, schema).await?);
85    }
86    if !config.disabled_rules.contains(&"A003".to_string()) {
87        advisories.extend(check_a003_timestamp_without_tz(client, schema).await?);
88    }
89    if !config.disabled_rules.contains(&"A004".to_string()) {
90        advisories.extend(check_a004_table_without_pk(client, schema).await?);
91    }
92    if !config.disabled_rules.contains(&"A005".to_string()) {
93        advisories.extend(check_a005_nullable_all_nonnull(client, schema).await?);
94    }
95    if !config.disabled_rules.contains(&"A006".to_string()) {
96        advisories.extend(check_a006_varchar_without_limit(client, schema).await?);
97    }
98    if !config.disabled_rules.contains(&"A007".to_string()) {
99        advisories.extend(check_a007_duplicate_indexes(client, schema).await?);
100    }
101    if !config.disabled_rules.contains(&"A008".to_string()) {
102        advisories.extend(check_a008_seq_scan_large_table(client, schema).await?);
103    }
104    if !config.disabled_rules.contains(&"A009".to_string()) {
105        advisories.extend(check_a009_large_enum(client, schema).await?);
106    }
107    if !config.disabled_rules.contains(&"A010".to_string()) {
108        advisories.extend(check_a010_orphaned_sequences(client, schema).await?);
109    }
110
111    let warning_count = advisories
112        .iter()
113        .filter(|a| a.severity == AdvisorySeverity::Warning)
114        .count();
115    let suggestion_count = advisories
116        .iter()
117        .filter(|a| a.severity == AdvisorySeverity::Suggestion)
118        .count();
119    let info_count = advisories
120        .iter()
121        .filter(|a| a.severity == AdvisorySeverity::Info)
122        .count();
123
124    Ok(AdvisorReport {
125        schema: schema.to_string(),
126        advisories,
127        warning_count,
128        suggestion_count,
129        info_count,
130    })
131}
132
133/// Generate combined fix SQL from all advisories.
134pub fn generate_fix_sql(report: &AdvisorReport) -> String {
135    let fixes: Vec<String> = report
136        .advisories
137        .iter()
138        .filter_map(|a| {
139            a.fix_sql.as_ref().map(|sql| {
140                format!(
141                    "-- {} [{}]: {}\n{}",
142                    a.rule_id, a.severity, a.explanation, sql
143                )
144            })
145        })
146        .collect();
147    fixes.join("\n\n")
148}
149
150// ── A001: Foreign key column missing index ──
151
152async fn check_a001_fk_without_index(client: &Client, schema: &str) -> Result<Vec<Advisory>> {
153    let sql = r#"
154        SELECT
155            tc.table_name,
156            kcu.column_name,
157            tc.constraint_name
158        FROM information_schema.table_constraints tc
159        JOIN information_schema.key_column_usage kcu
160            ON tc.constraint_name = kcu.constraint_name
161            AND tc.table_schema = kcu.table_schema
162        WHERE tc.constraint_type = 'FOREIGN KEY'
163            AND tc.table_schema = $1
164            AND NOT EXISTS (
165                SELECT 1 FROM pg_indexes pi
166                WHERE pi.schemaname = $1
167                    AND pi.tablename = tc.table_name
168                    AND pi.indexdef LIKE '%' || kcu.column_name || '%'
169            )
170        ORDER BY tc.table_name, kcu.column_name
171    "#;
172
173    let rows = client.query(sql, &[&schema]).await?;
174    Ok(rows
175        .iter()
176        .map(|r| {
177            let table: String = r.get(0);
178            let column: String = r.get(1);
179            Advisory {
180                rule_id: "A001".to_string(),
181                category: "Performance".to_string(),
182                severity: AdvisorySeverity::Warning,
183                object: format!("{}.{}", table, column),
184                explanation: format!(
185                    "Foreign key column {}.{} has no index, which can cause slow joins and constraint checks",
186                    table, column
187                ),
188                fix_sql: Some(format!(
189                    "CREATE INDEX idx_{}_{} ON {} ({});",
190                    table, column,
191                    quote_ident(&table),
192                    quote_ident(&column)
193                )),
194            }
195        })
196        .collect())
197}
198
199// ── A002: Unused indexes ──
200
201async fn check_a002_unused_indexes(client: &Client, schema: &str) -> Result<Vec<Advisory>> {
202    let sql = r#"
203        SELECT
204            s.indexrelname AS index_name,
205            s.relname AS table_name,
206            s.idx_scan AS scans
207        FROM pg_stat_user_indexes s
208        JOIN pg_index i ON s.indexrelid = i.indexrelid
209        WHERE s.schemaname = $1
210            AND s.idx_scan = 0
211            AND NOT i.indisprimary
212            AND NOT i.indisunique
213        ORDER BY s.relname, s.indexrelname
214    "#;
215
216    let rows = client.query(sql, &[&schema]).await?;
217    Ok(rows
218        .iter()
219        .map(|r| {
220            let index_name: String = r.get(0);
221            let table_name: String = r.get(1);
222            Advisory {
223                rule_id: "A002".to_string(),
224                category: "Performance".to_string(),
225                severity: AdvisorySeverity::Suggestion,
226                object: index_name.clone(),
227                explanation: format!(
228                    "Index {} on {} has never been used (0 scans). Consider removing it to reduce write overhead",
229                    index_name, table_name
230                ),
231                fix_sql: Some(format!("DROP INDEX {};", quote_ident(&index_name))),
232            }
233        })
234        .collect())
235}
236
237// ── A003: TIMESTAMP without timezone ──
238
239async fn check_a003_timestamp_without_tz(client: &Client, schema: &str) -> Result<Vec<Advisory>> {
240    let sql = r#"
241        SELECT table_name, column_name
242        FROM information_schema.columns
243        WHERE table_schema = $1
244            AND data_type = 'timestamp without time zone'
245        ORDER BY table_name, column_name
246    "#;
247
248    let rows = client.query(sql, &[&schema]).await?;
249    Ok(rows
250        .iter()
251        .map(|r| {
252            let table: String = r.get(0);
253            let column: String = r.get(1);
254            Advisory {
255                rule_id: "A003".to_string(),
256                category: "Correctness".to_string(),
257                severity: AdvisorySeverity::Warning,
258                object: format!("{}.{}", table, column),
259                explanation: format!(
260                    "Column {}.{} uses TIMESTAMP WITHOUT TIME ZONE. Use TIMESTAMPTZ to avoid timezone ambiguity",
261                    table, column
262                ),
263                fix_sql: Some(format!(
264                    "ALTER TABLE {} ALTER COLUMN {} TYPE TIMESTAMPTZ;",
265                    quote_ident(&table),
266                    quote_ident(&column)
267                )),
268            }
269        })
270        .collect())
271}
272
273// ── A004: Table without primary key ──
274
275async fn check_a004_table_without_pk(client: &Client, schema: &str) -> Result<Vec<Advisory>> {
276    let sql = r#"
277        SELECT t.table_name
278        FROM information_schema.tables t
279        WHERE t.table_schema = $1
280            AND t.table_type = 'BASE TABLE'
281            AND NOT EXISTS (
282                SELECT 1 FROM information_schema.table_constraints tc
283                WHERE tc.table_schema = $1
284                    AND tc.table_name = t.table_name
285                    AND tc.constraint_type = 'PRIMARY KEY'
286            )
287        ORDER BY t.table_name
288    "#;
289
290    let rows = client.query(sql, &[&schema]).await?;
291    Ok(rows
292        .iter()
293        .map(|r| {
294            let table: String = r.get(0);
295            Advisory {
296                rule_id: "A004".to_string(),
297                category: "Correctness".to_string(),
298                severity: AdvisorySeverity::Warning,
299                object: table.clone(),
300                explanation: format!(
301                    "Table {} has no primary key. This prevents logical replication and makes row identification unreliable",
302                    table
303                ),
304                fix_sql: None, // Can't auto-generate a PK without knowing the table
305            }
306        })
307        .collect())
308}
309
310// ── A005: Nullable column where all values are non-null ──
311
312async fn check_a005_nullable_all_nonnull(client: &Client, schema: &str) -> Result<Vec<Advisory>> {
313    // Only check tables with at least 100 rows to avoid false positives on empty tables
314    let sql = r#"
315        SELECT c.table_name, c.column_name
316        FROM information_schema.columns c
317        JOIN pg_stat_user_tables s
318            ON c.table_name = s.relname AND s.schemaname = $1
319        WHERE c.table_schema = $1
320            AND c.is_nullable = 'YES'
321            AND s.n_live_tup > 100
322            AND c.column_default IS NULL
323        ORDER BY c.table_name, c.column_name
324    "#;
325
326    let rows = client.query(sql, &[&schema]).await?;
327    let mut advisories = Vec::new();
328
329    for row in &rows {
330        let table: String = row.get(0);
331        let column: String = row.get(1);
332
333        // Check if any nulls actually exist
334        let null_check = format!(
335            "SELECT EXISTS (SELECT 1 FROM {} WHERE {} IS NULL LIMIT 1)",
336            quote_ident(&table),
337            quote_ident(&column)
338        );
339        if let Ok(result) = client.query_one(&null_check, &[]).await {
340            let has_nulls: bool = result.get(0);
341            if !has_nulls {
342                advisories.push(Advisory {
343                    rule_id: "A005".to_string(),
344                    category: "Correctness".to_string(),
345                    severity: AdvisorySeverity::Info,
346                    object: format!("{}.{}", table, column),
347                    explanation: format!(
348                        "Column {}.{} is nullable but contains no NULL values. Consider adding NOT NULL constraint",
349                        table, column
350                    ),
351                    fix_sql: Some(format!(
352                        "ALTER TABLE {} ALTER COLUMN {} SET NOT NULL;",
353                        quote_ident(&table),
354                        quote_ident(&column)
355                    )),
356                });
357            }
358        }
359    }
360
361    Ok(advisories)
362}
363
364// ── A006: VARCHAR without length limit ──
365
366async fn check_a006_varchar_without_limit(client: &Client, schema: &str) -> Result<Vec<Advisory>> {
367    let sql = r#"
368        SELECT table_name, column_name
369        FROM information_schema.columns
370        WHERE table_schema = $1
371            AND data_type = 'character varying'
372            AND character_maximum_length IS NULL
373        ORDER BY table_name, column_name
374    "#;
375
376    let rows = client.query(sql, &[&schema]).await?;
377    Ok(rows
378        .iter()
379        .map(|r| {
380            let table: String = r.get(0);
381            let column: String = r.get(1);
382            Advisory {
383                rule_id: "A006".to_string(),
384                category: "Design".to_string(),
385                severity: AdvisorySeverity::Info,
386                object: format!("{}.{}", table, column),
387                explanation: format!(
388                    "Column {}.{} is VARCHAR without length limit. Consider using TEXT or adding a length constraint",
389                    table, column
390                ),
391                fix_sql: None,
392            }
393        })
394        .collect())
395}
396
397// ── A007: Duplicate indexes ──
398
399async fn check_a007_duplicate_indexes(client: &Client, schema: &str) -> Result<Vec<Advisory>> {
400    let sql = r#"
401        SELECT
402            a.indexname AS index_a,
403            b.indexname AS index_b,
404            a.tablename
405        FROM pg_indexes a
406        JOIN pg_indexes b
407            ON a.tablename = b.tablename
408            AND a.schemaname = b.schemaname
409            AND a.indexname < b.indexname
410            AND a.indexdef = b.indexdef
411        WHERE a.schemaname = $1
412        ORDER BY a.tablename, a.indexname
413    "#;
414
415    let rows = client.query(sql, &[&schema]).await?;
416    Ok(rows
417        .iter()
418        .map(|r| {
419            let index_a: String = r.get(0);
420            let index_b: String = r.get(1);
421            let table: String = r.get(2);
422            Advisory {
423                rule_id: "A007".to_string(),
424                category: "Design".to_string(),
425                severity: AdvisorySeverity::Warning,
426                object: format!("{}, {}", index_a, index_b),
427                explanation: format!(
428                    "Indexes {} and {} on table {} have identical definitions. Remove the duplicate",
429                    index_a, index_b, table
430                ),
431                fix_sql: Some(format!("DROP INDEX {};", quote_ident(&index_b))),
432            }
433        })
434        .collect())
435}
436
437// ── A008: Sequential scan on large table ──
438
439async fn check_a008_seq_scan_large_table(client: &Client, schema: &str) -> Result<Vec<Advisory>> {
440    let sql = r#"
441        SELECT
442            relname,
443            seq_scan,
444            n_live_tup
445        FROM pg_stat_user_tables
446        WHERE schemaname = $1
447            AND n_live_tup > 100000
448            AND seq_scan > 0
449            AND seq_scan > idx_scan
450        ORDER BY seq_scan DESC
451    "#;
452
453    let rows = client.query(sql, &[&schema]).await?;
454    Ok(rows
455        .iter()
456        .map(|r| {
457            let table: String = r.get(0);
458            let seq_scans: i64 = r.get(1);
459            let row_count: i64 = r.get(2);
460            Advisory {
461                rule_id: "A008".to_string(),
462                category: "Performance".to_string(),
463                severity: AdvisorySeverity::Warning,
464                object: table.clone(),
465                explanation: format!(
466                    "Table {} (~{} rows) has {} sequential scans exceeding index scans. Consider adding indexes",
467                    table, row_count, seq_scans
468                ),
469                fix_sql: None,
470            }
471        })
472        .collect())
473}
474
475// ── A009: Enum with >20 values ──
476
477async fn check_a009_large_enum(client: &Client, schema: &str) -> Result<Vec<Advisory>> {
478    let sql = r#"
479        SELECT t.typname, count(e.enumlabel)::int AS label_count
480        FROM pg_type t
481        JOIN pg_enum e ON e.enumtypid = t.oid
482        JOIN pg_namespace n ON n.oid = t.typnamespace
483        WHERE n.nspname = $1
484        GROUP BY t.typname
485        HAVING count(e.enumlabel) > 20
486        ORDER BY t.typname
487    "#;
488
489    let rows = client.query(sql, &[&schema]).await?;
490    Ok(rows
491        .iter()
492        .map(|r| {
493            let name: String = r.get(0);
494            let count: i32 = r.get(1);
495            Advisory {
496                rule_id: "A009".to_string(),
497                category: "Design".to_string(),
498                severity: AdvisorySeverity::Suggestion,
499                object: name.clone(),
500                explanation: format!(
501                    "Enum type {} has {} values. Enums with many values are hard to maintain; consider a lookup table",
502                    name, count
503                ),
504                fix_sql: None,
505            }
506        })
507        .collect())
508}
509
510// ── A010: Orphaned sequences ──
511
512async fn check_a010_orphaned_sequences(client: &Client, schema: &str) -> Result<Vec<Advisory>> {
513    let sql = r#"
514        SELECT s.relname
515        FROM pg_class s
516        JOIN pg_namespace n ON n.oid = s.relnamespace
517        WHERE s.relkind = 'S'
518            AND n.nspname = $1
519            AND NOT EXISTS (
520                SELECT 1 FROM pg_depend d
521                WHERE d.objid = s.oid
522                    AND d.deptype IN ('a', 'i')
523            )
524        ORDER BY s.relname
525    "#;
526
527    let rows = client.query(sql, &[&schema]).await?;
528    Ok(rows
529        .iter()
530        .map(|r| {
531            let name: String = r.get(0);
532            Advisory {
533                rule_id: "A010".to_string(),
534                category: "Correctness".to_string(),
535                severity: AdvisorySeverity::Suggestion,
536                object: name.clone(),
537                explanation: format!(
538                    "Sequence {} is not attached to any column. It may be orphaned",
539                    name
540                ),
541                fix_sql: Some(format!("DROP SEQUENCE IF EXISTS {};", quote_ident(&name))),
542            }
543        })
544        .collect())
545}
546
547#[cfg(test)]
548mod tests {
549    use super::*;
550
551    #[test]
552    fn test_advisor_config_default() {
553        let config = AdvisorConfig::default();
554        assert!(!config.run_after_migrate);
555        assert!(config.disabled_rules.is_empty());
556    }
557
558    #[test]
559    fn test_generate_fix_sql_empty() {
560        let report = AdvisorReport {
561            schema: "public".to_string(),
562            advisories: vec![],
563            warning_count: 0,
564            suggestion_count: 0,
565            info_count: 0,
566        };
567        assert!(generate_fix_sql(&report).is_empty());
568    }
569
570    #[test]
571    fn test_generate_fix_sql_with_advisories() {
572        let report = AdvisorReport {
573            schema: "public".to_string(),
574            advisories: vec![
575                Advisory {
576                    rule_id: "A001".to_string(),
577                    category: "Performance".to_string(),
578                    severity: AdvisorySeverity::Warning,
579                    object: "orders.user_id".to_string(),
580                    explanation: "FK without index".to_string(),
581                    fix_sql: Some(
582                        "CREATE INDEX idx_orders_user_id ON \"orders\" (\"user_id\");".to_string(),
583                    ),
584                },
585                Advisory {
586                    rule_id: "A004".to_string(),
587                    category: "Correctness".to_string(),
588                    severity: AdvisorySeverity::Warning,
589                    object: "logs".to_string(),
590                    explanation: "No primary key".to_string(),
591                    fix_sql: None,
592                },
593            ],
594            warning_count: 2,
595            suggestion_count: 0,
596            info_count: 0,
597        };
598        let sql = generate_fix_sql(&report);
599        assert!(sql.contains("CREATE INDEX"));
600        assert!(sql.contains("A001"));
601        assert!(!sql.contains("A004")); // No fix SQL for A004
602    }
603
604    #[test]
605    fn test_advisory_severity_display() {
606        assert_eq!(AdvisorySeverity::Info.to_string(), "info");
607        assert_eq!(AdvisorySeverity::Suggestion.to_string(), "suggestion");
608        assert_eq!(AdvisorySeverity::Warning.to_string(), "warning");
609    }
610}