database_replicator/migration/
schema.rs1use anyhow::{Context, Result};
5use tokio_postgres::Client;
6
7#[derive(Debug, Clone)]
8pub struct DatabaseInfo {
9 pub name: String,
10 pub owner: String,
11}
12
13#[derive(Debug, Clone)]
14pub struct TableInfo {
15 pub schema: String,
16 pub name: String,
17 pub row_count_estimate: i64,
18}
19
20#[derive(Debug, Clone)]
21pub struct ColumnInfo {
22 pub name: String,
23 pub data_type: String,
24 pub is_timestamp: bool,
25}
26
27pub async fn list_databases(client: &Client) -> Result<Vec<DatabaseInfo>> {
29 let rows = client
30 .query(
31 "SELECT datname, pg_catalog.pg_get_userbyid(datdba) as owner
32 FROM pg_catalog.pg_database
33 WHERE datistemplate = false
34 AND datname NOT IN ('postgres', 'template0', 'template1')
35 ORDER BY datname",
36 &[],
37 )
38 .await
39 .context("Failed to list databases")?;
40
41 let databases = rows
42 .iter()
43 .map(|row| DatabaseInfo {
44 name: row.get(0),
45 owner: row.get(1),
46 })
47 .collect();
48
49 Ok(databases)
50}
51
52pub async fn list_tables(client: &Client) -> Result<Vec<TableInfo>> {
54 let rows = client
55 .query(
56 "SELECT
57 pg_tables.schemaname,
58 pg_tables.tablename,
59 COALESCE(n_live_tup, 0) as row_count
60 FROM pg_catalog.pg_tables
61 LEFT JOIN pg_catalog.pg_stat_user_tables
62 ON pg_tables.schemaname = pg_stat_user_tables.schemaname
63 AND pg_tables.tablename = pg_stat_user_tables.relname
64 WHERE pg_tables.schemaname NOT IN ('pg_catalog', 'information_schema')
65 ORDER BY pg_tables.schemaname, pg_tables.tablename",
66 &[],
67 )
68 .await
69 .context("Failed to list tables")?;
70
71 let tables = rows
72 .iter()
73 .map(|row| TableInfo {
74 schema: row.get(0),
75 name: row.get(1),
76 row_count_estimate: row.get(2),
77 })
78 .collect();
79
80 Ok(tables)
81}
82
83pub async fn get_table_columns(
88 client: &Client,
89 schema: &str,
90 table: &str,
91) -> Result<Vec<ColumnInfo>> {
92 let rows = client
93 .query(
94 "SELECT
95 a.attname as column_name,
96 t.typname as data_type,
97 CASE WHEN t.typname IN ('timestamp', 'timestamptz', 'date')
98 THEN true
99 ELSE false
100 END as is_timestamp
101 FROM pg_catalog.pg_attribute a
102 JOIN pg_catalog.pg_class c ON a.attrelid = c.oid
103 JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
104 JOIN pg_catalog.pg_type t ON a.atttypid = t.oid
105 WHERE n.nspname = $1
106 AND c.relname = $2
107 AND a.attnum > 0
108 AND NOT a.attisdropped
109 ORDER BY a.attnum",
110 &[&schema, &table],
111 )
112 .await
113 .with_context(|| format!("Failed to get columns for table '{}'.'{}'", schema, table))?;
114
115 let columns = rows
116 .iter()
117 .map(|row| ColumnInfo {
118 name: row.get(0),
119 data_type: row.get(1),
120 is_timestamp: row.get(2),
121 })
122 .collect();
123
124 Ok(columns)
125}
126
127#[cfg(test)]
128mod tests {
129 use super::*;
130 use crate::postgres::connect;
131
132 #[tokio::test]
133 #[ignore]
134 async fn test_list_databases() {
135 let url = std::env::var("TEST_SOURCE_URL").unwrap();
136 let client = connect(&url).await.unwrap();
137
138 let databases = list_databases(&client).await.unwrap();
139
140 assert!(!databases.is_empty());
142 println!("Found {} databases", databases.len());
143 for db in &databases {
144 println!(" - {} (owner: {})", db.name, db.owner);
145 }
146 }
147
148 #[tokio::test]
149 #[ignore]
150 async fn test_list_tables() {
151 let url = std::env::var("TEST_SOURCE_URL").unwrap();
152 let client = connect(&url).await.unwrap();
153
154 let tables = list_tables(&client).await.unwrap();
155
156 println!("Found {} tables", tables.len());
158 for table in tables.iter().take(10) {
159 println!(
160 " - {}.{} ({} rows)",
161 table.schema, table.name, table.row_count_estimate
162 );
163 }
164 }
165}