Skip to main content

sqlx_gen/introspect/
postgres.rs

1use crate::error::Result;
2use sqlx::PgPool;
3
4use super::{ColumnInfo, CompositeTypeInfo, DomainInfo, EnumInfo, SchemaInfo, TableInfo};
5
6pub async fn introspect(
7    pool: &PgPool,
8    schemas: &[String],
9    include_views: bool,
10) -> Result<SchemaInfo> {
11    let tables = fetch_tables(pool, schemas).await?;
12    let views = if include_views {
13        fetch_views(pool, schemas).await?
14    } else {
15        Vec::new()
16    };
17    let enums = fetch_enums(pool, schemas).await?;
18    let composite_types = fetch_composite_types(pool, schemas).await?;
19    let domains = fetch_domains(pool, schemas).await?;
20
21    Ok(SchemaInfo {
22        tables,
23        views,
24        enums,
25        composite_types,
26        domains,
27    })
28}
29
30async fn fetch_tables(pool: &PgPool, schemas: &[String]) -> Result<Vec<TableInfo>> {
31    let rows = sqlx::query_as::<_, (String, String, String, String, String, String, i32, bool, Option<String>)>(
32        r#"
33        SELECT
34            c.table_schema,
35            c.table_name,
36            c.column_name,
37            c.data_type,
38            COALESCE(c.udt_name, c.data_type) as udt_name,
39            c.is_nullable,
40            c.ordinal_position,
41            CASE WHEN kcu.column_name IS NOT NULL THEN true ELSE false END AS is_primary_key,
42            c.column_default
43        FROM information_schema.columns c
44        JOIN information_schema.tables t
45            ON t.table_schema = c.table_schema
46            AND t.table_name = c.table_name
47            AND t.table_type = 'BASE TABLE'
48        LEFT JOIN information_schema.table_constraints tc
49            ON tc.table_schema = c.table_schema
50            AND tc.table_name = c.table_name
51            AND tc.constraint_type = 'PRIMARY KEY'
52        LEFT JOIN information_schema.key_column_usage kcu
53            ON kcu.constraint_name = tc.constraint_name
54            AND kcu.constraint_schema = tc.constraint_schema
55            AND kcu.column_name = c.column_name
56        WHERE c.table_schema = ANY($1)
57        ORDER BY c.table_schema, c.table_name, c.ordinal_position
58        "#,
59    )
60    .bind(schemas)
61    .fetch_all(pool)
62    .await?;
63
64    let mut tables: Vec<TableInfo> = Vec::new();
65    let mut current_key: Option<(String, String)> = None;
66
67    for (schema, table, col_name, data_type, udt_name, nullable, ordinal, is_pk, column_default) in rows {
68        let key = (schema.clone(), table.clone());
69        if current_key.as_ref() != Some(&key) {
70            current_key = Some(key);
71            tables.push(TableInfo {
72                schema_name: schema.clone(),
73                name: table.clone(),
74                columns: Vec::new(),
75            });
76        }
77        tables.last_mut().unwrap().columns.push(ColumnInfo {
78            name: col_name,
79            data_type,
80            udt_name,
81            is_nullable: nullable == "YES",
82            is_primary_key: is_pk,
83            ordinal_position: ordinal,
84            schema_name: schema,
85            column_default,
86        });
87    }
88
89    Ok(tables)
90}
91
92async fn fetch_views(pool: &PgPool, schemas: &[String]) -> Result<Vec<TableInfo>> {
93    let rows = sqlx::query_as::<_, (String, String, String, String, String, String, i32, Option<String>)>(
94        r#"
95        SELECT
96            c.table_schema,
97            c.table_name,
98            c.column_name,
99            c.data_type,
100            COALESCE(c.udt_name, c.data_type) as udt_name,
101            c.is_nullable,
102            c.ordinal_position,
103            c.column_default
104        FROM information_schema.columns c
105        JOIN information_schema.tables t
106            ON t.table_schema = c.table_schema
107            AND t.table_name = c.table_name
108            AND t.table_type = 'VIEW'
109        WHERE c.table_schema = ANY($1)
110        ORDER BY c.table_schema, c.table_name, c.ordinal_position
111        "#,
112    )
113    .bind(schemas)
114    .fetch_all(pool)
115    .await?;
116
117    let mut views: Vec<TableInfo> = Vec::new();
118    let mut current_key: Option<(String, String)> = None;
119
120    for (schema, table, col_name, data_type, udt_name, nullable, ordinal, column_default) in rows {
121        let key = (schema.clone(), table.clone());
122        if current_key.as_ref() != Some(&key) {
123            current_key = Some(key);
124            views.push(TableInfo {
125                schema_name: schema.clone(),
126                name: table.clone(),
127                columns: Vec::new(),
128            });
129        }
130        views.last_mut().unwrap().columns.push(ColumnInfo {
131            name: col_name,
132            data_type,
133            udt_name,
134            is_nullable: nullable == "YES",
135            is_primary_key: false,
136            ordinal_position: ordinal,
137            schema_name: schema,
138            column_default,
139        });
140    }
141
142    Ok(views)
143}
144
145async fn fetch_enums(pool: &PgPool, schemas: &[String]) -> Result<Vec<EnumInfo>> {
146    let rows = sqlx::query_as::<_, (String, String, String)>(
147        r#"
148        SELECT
149            n.nspname AS schema_name,
150            t.typname AS enum_name,
151            e.enumlabel AS variant
152        FROM pg_catalog.pg_type t
153        JOIN pg_catalog.pg_enum e ON e.enumtypid = t.oid
154        JOIN pg_catalog.pg_namespace n ON n.oid = t.typnamespace
155        WHERE n.nspname = ANY($1)
156        ORDER BY n.nspname, t.typname, e.enumsortorder
157        "#,
158    )
159    .bind(schemas)
160    .fetch_all(pool)
161    .await?;
162
163    let mut enums: Vec<EnumInfo> = Vec::new();
164    let mut current_key: Option<(String, String)> = None;
165
166    for (schema, name, variant) in rows {
167        let key = (schema.clone(), name.clone());
168        if current_key.as_ref() != Some(&key) {
169            current_key = Some(key);
170            enums.push(EnumInfo {
171                schema_name: schema,
172                name,
173                variants: Vec::new(),
174                default_variant: None,
175            });
176        }
177        enums.last_mut().unwrap().variants.push(variant);
178    }
179
180    Ok(enums)
181}
182
183async fn fetch_composite_types(
184    pool: &PgPool,
185    schemas: &[String],
186) -> Result<Vec<CompositeTypeInfo>> {
187    let rows = sqlx::query_as::<_, (String, String, String, String, String, i32)>(
188        r#"
189        SELECT
190            n.nspname AS schema_name,
191            t.typname AS type_name,
192            a.attname AS field_name,
193            COALESCE(ft.typname, '') AS field_type,
194            CASE WHEN a.attnotnull THEN 'NO' ELSE 'YES' END AS is_nullable,
195            a.attnum AS ordinal
196        FROM pg_catalog.pg_type t
197        JOIN pg_catalog.pg_namespace n ON n.oid = t.typnamespace
198        JOIN pg_catalog.pg_class c ON c.oid = t.typrelid
199        JOIN pg_catalog.pg_attribute a ON a.attrelid = c.oid AND a.attnum > 0 AND NOT a.attisdropped
200        JOIN pg_catalog.pg_type ft ON ft.oid = a.atttypid
201        WHERE t.typtype = 'c'
202            AND n.nspname = ANY($1)
203            AND NOT EXISTS (
204                SELECT 1 FROM information_schema.tables it
205                WHERE it.table_schema = n.nspname AND it.table_name = t.typname
206            )
207        ORDER BY n.nspname, t.typname, a.attnum
208        "#,
209    )
210    .bind(schemas)
211    .fetch_all(pool)
212    .await?;
213
214    let mut composites: Vec<CompositeTypeInfo> = Vec::new();
215    let mut current_key: Option<(String, String)> = None;
216
217    for (schema, type_name, field_name, field_type, nullable, ordinal) in rows {
218        let key = (schema.clone(), type_name.clone());
219        if current_key.as_ref() != Some(&key) {
220            current_key = Some(key);
221            composites.push(CompositeTypeInfo {
222                schema_name: schema.clone(),
223                name: type_name,
224                fields: Vec::new(),
225            });
226        }
227        composites.last_mut().unwrap().fields.push(ColumnInfo {
228            name: field_name,
229            data_type: field_type.clone(),
230            udt_name: field_type,
231            is_nullable: nullable == "YES",
232            is_primary_key: false,
233            ordinal_position: ordinal,
234            schema_name: schema,
235            column_default: None,
236        });
237    }
238
239    Ok(composites)
240}
241
242async fn fetch_domains(pool: &PgPool, schemas: &[String]) -> Result<Vec<DomainInfo>> {
243    let rows = sqlx::query_as::<_, (String, String, String)>(
244        r#"
245        SELECT
246            n.nspname AS schema_name,
247            t.typname AS domain_name,
248            bt.typname AS base_type
249        FROM pg_catalog.pg_type t
250        JOIN pg_catalog.pg_namespace n ON n.oid = t.typnamespace
251        JOIN pg_catalog.pg_type bt ON bt.oid = t.typbasetype
252        WHERE t.typtype = 'd'
253            AND n.nspname = ANY($1)
254        ORDER BY n.nspname, t.typname
255        "#,
256    )
257    .bind(schemas)
258    .fetch_all(pool)
259    .await?;
260
261    Ok(rows
262        .into_iter()
263        .map(|(schema, name, base_type)| DomainInfo {
264            schema_name: schema,
265            name,
266            base_type,
267        })
268        .collect())
269}