database_replicator/migration/
schema.rs1use anyhow::{Context, Result};
5use tokio_postgres::Client;
6
7#[derive(Debug, Clone)]
8pub struct DatabaseInfo {
9 pub name: String,
10 pub owner: String,
11}
12
13#[derive(Debug, Clone)]
14pub struct TableInfo {
15 pub schema: String,
16 pub name: String,
17 pub row_count_estimate: i64,
18}
19
20#[derive(Debug, Clone)]
21pub struct ColumnInfo {
22 pub name: String,
23 pub data_type: String,
24 pub is_timestamp: bool,
25}
26
27pub async fn list_databases(client: &Client) -> Result<Vec<DatabaseInfo>> {
34 let rows = client
35 .query(
36 "SELECT datname, pg_catalog.pg_get_userbyid(datdba) as owner
37 FROM pg_catalog.pg_database
38 WHERE datistemplate = false
39 AND datname NOT IN ('postgres', 'template0', 'template1', 'rdsadmin')
40 ORDER BY datname",
41 &[],
42 )
43 .await
44 .context("Failed to list databases")?;
45
46 let databases = rows
47 .iter()
48 .map(|row| DatabaseInfo {
49 name: row.get(0),
50 owner: row.get(1),
51 })
52 .collect();
53
54 Ok(databases)
55}
56
57pub async fn list_tables(client: &Client) -> Result<Vec<TableInfo>> {
59 let rows = client
60 .query(
61 "SELECT
62 pg_tables.schemaname,
63 pg_tables.tablename,
64 COALESCE(n_live_tup, 0) as row_count
65 FROM pg_catalog.pg_tables
66 LEFT JOIN pg_catalog.pg_stat_user_tables
67 ON pg_tables.schemaname = pg_stat_user_tables.schemaname
68 AND pg_tables.tablename = pg_stat_user_tables.relname
69 WHERE pg_tables.schemaname NOT IN ('pg_catalog', 'information_schema')
70 ORDER BY pg_tables.schemaname, pg_tables.tablename",
71 &[],
72 )
73 .await
74 .context("Failed to list tables")?;
75
76 let tables = rows
77 .iter()
78 .map(|row| TableInfo {
79 schema: row.get(0),
80 name: row.get(1),
81 row_count_estimate: row.get(2),
82 })
83 .collect();
84
85 Ok(tables)
86}
87
88pub async fn get_table_columns(
93 client: &Client,
94 schema: &str,
95 table: &str,
96) -> Result<Vec<ColumnInfo>> {
97 let rows = client
98 .query(
99 "SELECT
100 a.attname as column_name,
101 t.typname as data_type,
102 CASE WHEN t.typname IN ('timestamp', 'timestamptz', 'date')
103 THEN true
104 ELSE false
105 END as is_timestamp
106 FROM pg_catalog.pg_attribute a
107 JOIN pg_catalog.pg_class c ON a.attrelid = c.oid
108 JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
109 JOIN pg_catalog.pg_type t ON a.atttypid = t.oid
110 WHERE n.nspname = $1
111 AND c.relname = $2
112 AND a.attnum > 0
113 AND NOT a.attisdropped
114 ORDER BY a.attnum",
115 &[&schema, &table],
116 )
117 .await
118 .with_context(|| format!("Failed to get columns for table '{}'.'{}'", schema, table))?;
119
120 let columns = rows
121 .iter()
122 .map(|row| ColumnInfo {
123 name: row.get(0),
124 data_type: row.get(1),
125 is_timestamp: row.get(2),
126 })
127 .collect();
128
129 Ok(columns)
130}
131
132#[cfg(test)]
133mod tests {
134 use super::*;
135 use crate::postgres::connect;
136
137 #[tokio::test]
138 #[ignore]
139 async fn test_list_databases() {
140 let url = std::env::var("TEST_SOURCE_URL").unwrap();
141 let client = connect(&url).await.unwrap();
142
143 let databases = list_databases(&client).await.unwrap();
144
145 assert!(!databases.is_empty());
147 println!("Found {} databases", databases.len());
148 for db in &databases {
149 println!(" - {} (owner: {})", db.name, db.owner);
150 }
151 }
152
153 #[tokio::test]
154 #[ignore]
155 async fn test_list_tables() {
156 let url = std::env::var("TEST_SOURCE_URL").unwrap();
157 let client = connect(&url).await.unwrap();
158
159 let tables = list_tables(&client).await.unwrap();
160
161 println!("Found {} tables", tables.len());
163 for table in tables.iter().take(10) {
164 println!(
165 " - {}.{} ({} rows)",
166 table.schema, table.name, table.row_count_estimate
167 );
168 }
169 }
170}