sqldiff/schema/
connection.rs

1use anyhow::{Error, Result};
2use itertools::Itertools;
3use sqlx::PgConnection;
4use sqlx::postgres::PgPoolOptions;
5
6use crate::{Schema, schema};
7use crate::schema::{Column, Table};
8
9const QUERY_COLUMNS: &str = "SELECT
10	table_name, column_name, ordinal_position, is_nullable, data_type
11FROM information_schema.columns
12WHERE
13	table_schema=$1
14ORDER BY table_name, ordinal_position";
15
16const QUERY_TABLES: &str = "SELECT
17    table_schema
18    , table_name
19FROM information_schema.tables
20WHERE
21    table_schema = $1 ";
22
23#[derive(sqlx::FromRow)]
24pub struct SchemaColumn {
25    pub table_name: String,
26    pub column_name: String,
27    pub ordinal_position: i32,
28    pub is_nullable: String,
29    pub data_type: String,
30}
31
32pub async fn query_schema_columns(mut conn: &mut PgConnection, schema_name: &str) -> Result<Vec<SchemaColumn>> {
33    let result = sqlx::query_as::<_, SchemaColumn>(QUERY_COLUMNS)
34        .bind(schema_name)
35        .fetch_all(conn)
36        .await?;
37    Ok(result)
38}
39
40#[derive(sqlx::FromRow)]
41pub struct TableSchema {
42    pub table_schema: String,
43    pub table_name: String,
44}
45
46pub async fn query_table_names(mut conn: &mut PgConnection, schema_name: &str) -> Result<Vec<String>> {
47    let result = sqlx::query_as::<_, TableSchema>(QUERY_TABLES)
48        .bind(schema_name)
49        .fetch_all(conn)
50        .await?;
51    Ok(result.into_iter().map(|t| t.table_name).collect())
52}
53
54
55impl Schema {
56    pub async fn try_from_database(conn: &mut PgConnection, schema_name: &str) -> Result<Schema> {
57        let column_schemas = query_schema_columns(conn, schema_name).await?;
58        let mut tables = column_schemas.into_iter()
59            .group_by(|c| c.table_name.clone())
60            .into_iter()
61            .map(|(table_name, group)| {
62                let columns = group.map(|c: SchemaColumn| {
63                    let nullable = c.is_nullable == "YES";
64                    let typ = schema::Type::from_str(&c.data_type)?;
65                    Ok(Column {
66                        name: c.column_name.clone(),
67                        typ,
68                        nullable,
69                        primary_key: false,
70                        default: None,
71                    })
72                }).collect::<Result<Vec<_>, Error>>()?;
73                Ok(Table { name: table_name, columns, indexes: vec![] })
74            })
75            .collect::<Result<Vec<_>, Error>>()?;
76
77        // Degenerate case but you can have tables with no columns...
78        let table_names = query_table_names(conn, schema_name).await?;
79        for name in table_names {
80            if tables.iter().any(|t| t.name == name) {
81                continue;
82            }
83            tables.push(Table {
84                name,
85                columns: vec![],
86                indexes: vec![],
87            })
88        }
89        Ok(Schema { tables })
90    }
91}