database_replicator/migration/
schema.rs

1// ABOUTME: Schema introspection utilities for migration planning
2// ABOUTME: Discovers databases, tables, and objects that need migration
3
4use anyhow::{Context, Result};
5use tokio_postgres::Client;
6
7#[derive(Debug, Clone)]
8pub struct DatabaseInfo {
9    pub name: String,
10    pub owner: String,
11}
12
13#[derive(Debug, Clone)]
14pub struct TableInfo {
15    pub schema: String,
16    pub name: String,
17    pub row_count_estimate: i64,
18}
19
20#[derive(Debug, Clone)]
21pub struct ColumnInfo {
22    pub name: String,
23    pub data_type: String,
24    pub is_timestamp: bool,
25}
26
27/// List all non-system databases in the cluster
28///
29/// Excludes:
30/// - PostgreSQL template databases (template0, template1)
31/// - PostgreSQL default database (postgres)
32/// - AWS RDS internal database (rdsadmin)
33pub async fn list_databases(client: &Client) -> Result<Vec<DatabaseInfo>> {
34    let rows = client
35        .query(
36            "SELECT datname, pg_catalog.pg_get_userbyid(datdba) as owner
37             FROM pg_catalog.pg_database
38             WHERE datistemplate = false
39               AND datname NOT IN ('postgres', 'template0', 'template1', 'rdsadmin')
40             ORDER BY datname",
41            &[],
42        )
43        .await
44        .context("Failed to list databases")?;
45
46    let databases = rows
47        .iter()
48        .map(|row| DatabaseInfo {
49            name: row.get(0),
50            owner: row.get(1),
51        })
52        .collect();
53
54    Ok(databases)
55}
56
57/// List all tables in the current database
58pub async fn list_tables(client: &Client) -> Result<Vec<TableInfo>> {
59    let rows = client
60        .query(
61            "SELECT
62                pg_tables.schemaname,
63                pg_tables.tablename,
64                COALESCE(n_live_tup, 0) as row_count
65             FROM pg_catalog.pg_tables
66             LEFT JOIN pg_catalog.pg_stat_user_tables
67                ON pg_tables.schemaname = pg_stat_user_tables.schemaname
68                AND pg_tables.tablename = pg_stat_user_tables.relname
69             WHERE pg_tables.schemaname NOT IN ('pg_catalog', 'information_schema')
70             ORDER BY pg_tables.schemaname, pg_tables.tablename",
71            &[],
72        )
73        .await
74        .context("Failed to list tables")?;
75
76    let tables = rows
77        .iter()
78        .map(|row| TableInfo {
79            schema: row.get(0),
80            name: row.get(1),
81            row_count_estimate: row.get(2),
82        })
83        .collect();
84
85    Ok(tables)
86}
87
88/// Get columns for a specific table with their types
89///
90/// Returns all columns with a flag indicating if they are timestamp-like types.
91/// Timestamp types include: timestamp, timestamptz, date
92pub async fn get_table_columns(
93    client: &Client,
94    schema: &str,
95    table: &str,
96) -> Result<Vec<ColumnInfo>> {
97    let rows = client
98        .query(
99            "SELECT
100                a.attname as column_name,
101                t.typname as data_type,
102                CASE WHEN t.typname IN ('timestamp', 'timestamptz', 'date')
103                     THEN true
104                     ELSE false
105                END as is_timestamp
106             FROM pg_catalog.pg_attribute a
107             JOIN pg_catalog.pg_class c ON a.attrelid = c.oid
108             JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
109             JOIN pg_catalog.pg_type t ON a.atttypid = t.oid
110             WHERE n.nspname = $1
111               AND c.relname = $2
112               AND a.attnum > 0
113               AND NOT a.attisdropped
114             ORDER BY a.attnum",
115            &[&schema, &table],
116        )
117        .await
118        .with_context(|| format!("Failed to get columns for table '{}'.'{}'", schema, table))?;
119
120    let columns = rows
121        .iter()
122        .map(|row| ColumnInfo {
123            name: row.get(0),
124            data_type: row.get(1),
125            is_timestamp: row.get(2),
126        })
127        .collect();
128
129    Ok(columns)
130}
131
132#[cfg(test)]
133mod tests {
134    use super::*;
135    use crate::postgres::connect;
136
137    #[tokio::test]
138    #[ignore]
139    async fn test_list_databases() {
140        let url = std::env::var("TEST_SOURCE_URL").unwrap();
141        let client = connect(&url).await.unwrap();
142
143        let databases = list_databases(&client).await.unwrap();
144
145        // Should have at least the current database
146        assert!(!databases.is_empty());
147        println!("Found {} databases", databases.len());
148        for db in &databases {
149            println!("  - {} (owner: {})", db.name, db.owner);
150        }
151    }
152
153    #[tokio::test]
154    #[ignore]
155    async fn test_list_tables() {
156        let url = std::env::var("TEST_SOURCE_URL").unwrap();
157        let client = connect(&url).await.unwrap();
158
159        let tables = list_tables(&client).await.unwrap();
160
161        // Result depends on test database, but should not error
162        println!("Found {} tables", tables.len());
163        for table in tables.iter().take(10) {
164            println!(
165                "  - {}.{} ({} rows)",
166                table.schema, table.name, table.row_count_estimate
167            );
168        }
169    }
170}