Skip to main content

sqlx_gen/introspect/
postgres.rs

1use crate::error::Result;
2use sqlx::PgPool;
3
4use super::{ColumnInfo, CompositeTypeInfo, DomainInfo, EnumInfo, SchemaInfo, TableInfo};
5
6pub async fn introspect(
7    pool: &PgPool,
8    schemas: &[String],
9    include_views: bool,
10) -> Result<SchemaInfo> {
11    let tables = fetch_tables(pool, schemas).await?;
12    let views = if include_views {
13        fetch_views(pool, schemas).await?
14    } else {
15        Vec::new()
16    };
17    let enums = fetch_enums(pool, schemas).await?;
18    let composite_types = fetch_composite_types(pool, schemas).await?;
19    let domains = fetch_domains(pool, schemas).await?;
20
21    Ok(SchemaInfo {
22        tables,
23        views,
24        enums,
25        composite_types,
26        domains,
27    })
28}
29
30async fn fetch_tables(pool: &PgPool, schemas: &[String]) -> Result<Vec<TableInfo>> {
31    let rows = sqlx::query_as::<_, (String, String, String, String, String, String, i32, bool)>(
32        r#"
33        SELECT
34            c.table_schema,
35            c.table_name,
36            c.column_name,
37            c.data_type,
38            COALESCE(c.udt_name, c.data_type) as udt_name,
39            c.is_nullable,
40            c.ordinal_position,
41            CASE WHEN kcu.column_name IS NOT NULL THEN true ELSE false END AS is_primary_key
42        FROM information_schema.columns c
43        JOIN information_schema.tables t
44            ON t.table_schema = c.table_schema
45            AND t.table_name = c.table_name
46            AND t.table_type = 'BASE TABLE'
47        LEFT JOIN information_schema.table_constraints tc
48            ON tc.table_schema = c.table_schema
49            AND tc.table_name = c.table_name
50            AND tc.constraint_type = 'PRIMARY KEY'
51        LEFT JOIN information_schema.key_column_usage kcu
52            ON kcu.constraint_name = tc.constraint_name
53            AND kcu.constraint_schema = tc.constraint_schema
54            AND kcu.column_name = c.column_name
55        WHERE c.table_schema = ANY($1)
56        ORDER BY c.table_schema, c.table_name, c.ordinal_position
57        "#,
58    )
59    .bind(schemas)
60    .fetch_all(pool)
61    .await?;
62
63    let mut tables: Vec<TableInfo> = Vec::new();
64    let mut current_key: Option<(String, String)> = None;
65
66    for (schema, table, col_name, data_type, udt_name, nullable, ordinal, is_pk) in rows {
67        let key = (schema.clone(), table.clone());
68        if current_key.as_ref() != Some(&key) {
69            current_key = Some(key);
70            tables.push(TableInfo {
71                schema_name: schema.clone(),
72                name: table.clone(),
73                columns: Vec::new(),
74            });
75        }
76        tables.last_mut().unwrap().columns.push(ColumnInfo {
77            name: col_name,
78            data_type,
79            udt_name,
80            is_nullable: nullable == "YES",
81            is_primary_key: is_pk,
82            ordinal_position: ordinal,
83            schema_name: schema,
84        });
85    }
86
87    Ok(tables)
88}
89
90async fn fetch_views(pool: &PgPool, schemas: &[String]) -> Result<Vec<TableInfo>> {
91    let rows = sqlx::query_as::<_, (String, String, String, String, String, String, i32)>(
92        r#"
93        SELECT
94            c.table_schema,
95            c.table_name,
96            c.column_name,
97            c.data_type,
98            COALESCE(c.udt_name, c.data_type) as udt_name,
99            c.is_nullable,
100            c.ordinal_position
101        FROM information_schema.columns c
102        JOIN information_schema.tables t
103            ON t.table_schema = c.table_schema
104            AND t.table_name = c.table_name
105            AND t.table_type = 'VIEW'
106        WHERE c.table_schema = ANY($1)
107        ORDER BY c.table_schema, c.table_name, c.ordinal_position
108        "#,
109    )
110    .bind(schemas)
111    .fetch_all(pool)
112    .await?;
113
114    let mut views: Vec<TableInfo> = Vec::new();
115    let mut current_key: Option<(String, String)> = None;
116
117    for (schema, table, col_name, data_type, udt_name, nullable, ordinal) in rows {
118        let key = (schema.clone(), table.clone());
119        if current_key.as_ref() != Some(&key) {
120            current_key = Some(key);
121            views.push(TableInfo {
122                schema_name: schema.clone(),
123                name: table.clone(),
124                columns: Vec::new(),
125            });
126        }
127        views.last_mut().unwrap().columns.push(ColumnInfo {
128            name: col_name,
129            data_type,
130            udt_name,
131            is_nullable: nullable == "YES",
132            is_primary_key: false,
133            ordinal_position: ordinal,
134            schema_name: schema,
135        });
136    }
137
138    Ok(views)
139}
140
141async fn fetch_enums(pool: &PgPool, schemas: &[String]) -> Result<Vec<EnumInfo>> {
142    let rows = sqlx::query_as::<_, (String, String, String)>(
143        r#"
144        SELECT
145            n.nspname AS schema_name,
146            t.typname AS enum_name,
147            e.enumlabel AS variant
148        FROM pg_catalog.pg_type t
149        JOIN pg_catalog.pg_enum e ON e.enumtypid = t.oid
150        JOIN pg_catalog.pg_namespace n ON n.oid = t.typnamespace
151        WHERE n.nspname = ANY($1)
152        ORDER BY n.nspname, t.typname, e.enumsortorder
153        "#,
154    )
155    .bind(schemas)
156    .fetch_all(pool)
157    .await?;
158
159    let mut enums: Vec<EnumInfo> = Vec::new();
160    let mut current_key: Option<(String, String)> = None;
161
162    for (schema, name, variant) in rows {
163        let key = (schema.clone(), name.clone());
164        if current_key.as_ref() != Some(&key) {
165            current_key = Some(key);
166            enums.push(EnumInfo {
167                schema_name: schema,
168                name,
169                variants: Vec::new(),
170            });
171        }
172        enums.last_mut().unwrap().variants.push(variant);
173    }
174
175    Ok(enums)
176}
177
178async fn fetch_composite_types(
179    pool: &PgPool,
180    schemas: &[String],
181) -> Result<Vec<CompositeTypeInfo>> {
182    let rows = sqlx::query_as::<_, (String, String, String, String, String, i32)>(
183        r#"
184        SELECT
185            n.nspname AS schema_name,
186            t.typname AS type_name,
187            a.attname AS field_name,
188            COALESCE(ft.typname, '') AS field_type,
189            CASE WHEN a.attnotnull THEN 'NO' ELSE 'YES' END AS is_nullable,
190            a.attnum AS ordinal
191        FROM pg_catalog.pg_type t
192        JOIN pg_catalog.pg_namespace n ON n.oid = t.typnamespace
193        JOIN pg_catalog.pg_class c ON c.oid = t.typrelid
194        JOIN pg_catalog.pg_attribute a ON a.attrelid = c.oid AND a.attnum > 0 AND NOT a.attisdropped
195        JOIN pg_catalog.pg_type ft ON ft.oid = a.atttypid
196        WHERE t.typtype = 'c'
197            AND n.nspname = ANY($1)
198            AND NOT EXISTS (
199                SELECT 1 FROM information_schema.tables it
200                WHERE it.table_schema = n.nspname AND it.table_name = t.typname
201            )
202        ORDER BY n.nspname, t.typname, a.attnum
203        "#,
204    )
205    .bind(schemas)
206    .fetch_all(pool)
207    .await?;
208
209    let mut composites: Vec<CompositeTypeInfo> = Vec::new();
210    let mut current_key: Option<(String, String)> = None;
211
212    for (schema, type_name, field_name, field_type, nullable, ordinal) in rows {
213        let key = (schema.clone(), type_name.clone());
214        if current_key.as_ref() != Some(&key) {
215            current_key = Some(key);
216            composites.push(CompositeTypeInfo {
217                schema_name: schema.clone(),
218                name: type_name,
219                fields: Vec::new(),
220            });
221        }
222        composites.last_mut().unwrap().fields.push(ColumnInfo {
223            name: field_name,
224            data_type: field_type.clone(),
225            udt_name: field_type,
226            is_nullable: nullable == "YES",
227            is_primary_key: false,
228            ordinal_position: ordinal,
229            schema_name: schema,
230        });
231    }
232
233    Ok(composites)
234}
235
236async fn fetch_domains(pool: &PgPool, schemas: &[String]) -> Result<Vec<DomainInfo>> {
237    let rows = sqlx::query_as::<_, (String, String, String)>(
238        r#"
239        SELECT
240            n.nspname AS schema_name,
241            t.typname AS domain_name,
242            bt.typname AS base_type
243        FROM pg_catalog.pg_type t
244        JOIN pg_catalog.pg_namespace n ON n.oid = t.typnamespace
245        JOIN pg_catalog.pg_type bt ON bt.oid = t.typbasetype
246        WHERE t.typtype = 'd'
247            AND n.nspname = ANY($1)
248        ORDER BY n.nspname, t.typname
249        "#,
250    )
251    .bind(schemas)
252    .fetch_all(pool)
253    .await?;
254
255    Ok(rows
256        .into_iter()
257        .map(|(schema, name, base_type)| DomainInfo {
258            schema_name: schema,
259            name,
260            base_type,
261        })
262        .collect())
263}