sqldiff/schema/
connection.rs1use anyhow::{Error, Result};
2use itertools::Itertools;
3use sqlx::PgConnection;
4use sqlx::postgres::PgPoolOptions;
5
6use crate::{Schema, schema};
7use crate::schema::{Column, Table};
8
9const QUERY_COLUMNS: &str = "SELECT
10 table_name, column_name, ordinal_position, is_nullable, data_type
11FROM information_schema.columns
12WHERE
13 table_schema=$1
14ORDER BY table_name, ordinal_position";
15
16const QUERY_TABLES: &str = "SELECT
17 table_schema
18 , table_name
19FROM information_schema.tables
20WHERE
21 table_schema = $1 ";
22
23#[derive(sqlx::FromRow)]
24pub struct SchemaColumn {
25 pub table_name: String,
26 pub column_name: String,
27 pub ordinal_position: i32,
28 pub is_nullable: String,
29 pub data_type: String,
30}
31
32pub async fn query_schema_columns(mut conn: &mut PgConnection, schema_name: &str) -> Result<Vec<SchemaColumn>> {
33 let result = sqlx::query_as::<_, SchemaColumn>(QUERY_COLUMNS)
34 .bind(schema_name)
35 .fetch_all(conn)
36 .await?;
37 Ok(result)
38}
39
40#[derive(sqlx::FromRow)]
41pub struct TableSchema {
42 pub table_schema: String,
43 pub table_name: String,
44}
45
46pub async fn query_table_names(mut conn: &mut PgConnection, schema_name: &str) -> Result<Vec<String>> {
47 let result = sqlx::query_as::<_, TableSchema>(QUERY_TABLES)
48 .bind(schema_name)
49 .fetch_all(conn)
50 .await?;
51 Ok(result.into_iter().map(|t| t.table_name).collect())
52}
53
54
55impl Schema {
56 pub async fn try_from_database(conn: &mut PgConnection, schema_name: &str) -> Result<Schema> {
57 let column_schemas = query_schema_columns(conn, schema_name).await?;
58 let mut tables = column_schemas.into_iter()
59 .group_by(|c| c.table_name.clone())
60 .into_iter()
61 .map(|(table_name, group)| {
62 let columns = group.map(|c: SchemaColumn| {
63 let nullable = c.is_nullable == "YES";
64 let typ = schema::Type::from_str(&c.data_type)?;
65 Ok(Column {
66 name: c.column_name.clone(),
67 typ,
68 nullable,
69 primary_key: false,
70 default: None,
71 })
72 }).collect::<Result<Vec<_>, Error>>()?;
73 Ok(Table { name: table_name, columns, indexes: vec![] })
74 })
75 .collect::<Result<Vec<_>, Error>>()?;
76
77 let table_names = query_table_names(conn, schema_name).await?;
79 for name in table_names {
80 if tables.iter().any(|t| t.name == name) {
81 continue;
82 }
83 tables.push(Table {
84 name,
85 columns: vec![],
86 indexes: vec![],
87 })
88 }
89 Ok(Schema { tables })
90 }
91}