database_replicator/migration/
schema.rs

1// ABOUTME: Schema introspection utilities for migration planning
2// ABOUTME: Discovers databases, tables, and objects that need migration
3
4use anyhow::{Context, Result};
5use tokio_postgres::Client;
6
7#[derive(Debug, Clone)]
8pub struct DatabaseInfo {
9    pub name: String,
10    pub owner: String,
11}
12
13#[derive(Debug, Clone)]
14pub struct TableInfo {
15    pub schema: String,
16    pub name: String,
17    pub row_count_estimate: i64,
18}
19
20#[derive(Debug, Clone)]
21pub struct ColumnInfo {
22    pub name: String,
23    pub data_type: String,
24    pub is_timestamp: bool,
25}
26
27/// List all non-system databases in the cluster
28pub async fn list_databases(client: &Client) -> Result<Vec<DatabaseInfo>> {
29    let rows = client
30        .query(
31            "SELECT datname, pg_catalog.pg_get_userbyid(datdba) as owner
32             FROM pg_catalog.pg_database
33             WHERE datistemplate = false
34               AND datname NOT IN ('postgres', 'template0', 'template1')
35             ORDER BY datname",
36            &[],
37        )
38        .await
39        .context("Failed to list databases")?;
40
41    let databases = rows
42        .iter()
43        .map(|row| DatabaseInfo {
44            name: row.get(0),
45            owner: row.get(1),
46        })
47        .collect();
48
49    Ok(databases)
50}
51
52/// List all tables in the current database
53pub async fn list_tables(client: &Client) -> Result<Vec<TableInfo>> {
54    let rows = client
55        .query(
56            "SELECT
57                pg_tables.schemaname,
58                pg_tables.tablename,
59                COALESCE(n_live_tup, 0) as row_count
60             FROM pg_catalog.pg_tables
61             LEFT JOIN pg_catalog.pg_stat_user_tables
62                ON pg_tables.schemaname = pg_stat_user_tables.schemaname
63                AND pg_tables.tablename = pg_stat_user_tables.relname
64             WHERE pg_tables.schemaname NOT IN ('pg_catalog', 'information_schema')
65             ORDER BY pg_tables.schemaname, pg_tables.tablename",
66            &[],
67        )
68        .await
69        .context("Failed to list tables")?;
70
71    let tables = rows
72        .iter()
73        .map(|row| TableInfo {
74            schema: row.get(0),
75            name: row.get(1),
76            row_count_estimate: row.get(2),
77        })
78        .collect();
79
80    Ok(tables)
81}
82
83/// Get columns for a specific table with their types
84///
85/// Returns all columns with a flag indicating if they are timestamp-like types.
86/// Timestamp types include: timestamp, timestamptz, date
87pub async fn get_table_columns(
88    client: &Client,
89    schema: &str,
90    table: &str,
91) -> Result<Vec<ColumnInfo>> {
92    let rows = client
93        .query(
94            "SELECT
95                a.attname as column_name,
96                t.typname as data_type,
97                CASE WHEN t.typname IN ('timestamp', 'timestamptz', 'date')
98                     THEN true
99                     ELSE false
100                END as is_timestamp
101             FROM pg_catalog.pg_attribute a
102             JOIN pg_catalog.pg_class c ON a.attrelid = c.oid
103             JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
104             JOIN pg_catalog.pg_type t ON a.atttypid = t.oid
105             WHERE n.nspname = $1
106               AND c.relname = $2
107               AND a.attnum > 0
108               AND NOT a.attisdropped
109             ORDER BY a.attnum",
110            &[&schema, &table],
111        )
112        .await
113        .with_context(|| format!("Failed to get columns for table '{}'.'{}'", schema, table))?;
114
115    let columns = rows
116        .iter()
117        .map(|row| ColumnInfo {
118            name: row.get(0),
119            data_type: row.get(1),
120            is_timestamp: row.get(2),
121        })
122        .collect();
123
124    Ok(columns)
125}
126
127#[cfg(test)]
128mod tests {
129    use super::*;
130    use crate::postgres::connect;
131
132    #[tokio::test]
133    #[ignore]
134    async fn test_list_databases() {
135        let url = std::env::var("TEST_SOURCE_URL").unwrap();
136        let client = connect(&url).await.unwrap();
137
138        let databases = list_databases(&client).await.unwrap();
139
140        // Should have at least the current database
141        assert!(!databases.is_empty());
142        println!("Found {} databases", databases.len());
143        for db in &databases {
144            println!("  - {} (owner: {})", db.name, db.owner);
145        }
146    }
147
148    #[tokio::test]
149    #[ignore]
150    async fn test_list_tables() {
151        let url = std::env::var("TEST_SOURCE_URL").unwrap();
152        let client = connect(&url).await.unwrap();
153
154        let tables = list_tables(&client).await.unwrap();
155
156        // Result depends on test database, but should not error
157        println!("Found {} tables", tables.len());
158        for table in tables.iter().take(10) {
159            println!(
160                "  - {}.{} ({} rows)",
161                table.schema, table.name, table.row_count_estimate
162            );
163        }
164    }
165}