1use crate::error::Result;
2use sqlx::PgPool;
3
4use super::{ColumnInfo, CompositeTypeInfo, DomainInfo, EnumInfo, SchemaInfo, TableInfo};
5
6pub async fn introspect(
7 pool: &PgPool,
8 schemas: &[String],
9 include_views: bool,
10) -> Result<SchemaInfo> {
11 let tables = fetch_tables(pool, schemas).await?;
12 let views = if include_views {
13 fetch_views(pool, schemas).await?
14 } else {
15 Vec::new()
16 };
17 let enums = fetch_enums(pool, schemas).await?;
18 let composite_types = fetch_composite_types(pool, schemas).await?;
19 let domains = fetch_domains(pool, schemas).await?;
20
21 Ok(SchemaInfo {
22 tables,
23 views,
24 enums,
25 composite_types,
26 domains,
27 })
28}
29
30async fn fetch_tables(pool: &PgPool, schemas: &[String]) -> Result<Vec<TableInfo>> {
31 let rows = sqlx::query_as::<_, (String, String, String, String, String, String, i32, bool, Option<String>)>(
32 r#"
33 SELECT
34 c.table_schema,
35 c.table_name,
36 c.column_name,
37 c.data_type,
38 COALESCE(c.udt_name, c.data_type) as udt_name,
39 c.is_nullable,
40 c.ordinal_position,
41 CASE WHEN kcu.column_name IS NOT NULL THEN true ELSE false END AS is_primary_key,
42 c.column_default
43 FROM information_schema.columns c
44 JOIN information_schema.tables t
45 ON t.table_schema = c.table_schema
46 AND t.table_name = c.table_name
47 AND t.table_type = 'BASE TABLE'
48 LEFT JOIN information_schema.table_constraints tc
49 ON tc.table_schema = c.table_schema
50 AND tc.table_name = c.table_name
51 AND tc.constraint_type = 'PRIMARY KEY'
52 LEFT JOIN information_schema.key_column_usage kcu
53 ON kcu.constraint_name = tc.constraint_name
54 AND kcu.constraint_schema = tc.constraint_schema
55 AND kcu.column_name = c.column_name
56 WHERE c.table_schema = ANY($1)
57 ORDER BY c.table_schema, c.table_name, c.ordinal_position
58 "#,
59 )
60 .bind(schemas)
61 .fetch_all(pool)
62 .await?;
63
64 let mut tables: Vec<TableInfo> = Vec::new();
65 let mut current_key: Option<(String, String)> = None;
66
67 for (schema, table, col_name, data_type, udt_name, nullable, ordinal, is_pk, column_default) in rows {
68 let key = (schema.clone(), table.clone());
69 if current_key.as_ref() != Some(&key) {
70 current_key = Some(key);
71 tables.push(TableInfo {
72 schema_name: schema.clone(),
73 name: table.clone(),
74 columns: Vec::new(),
75 });
76 }
77 tables.last_mut().unwrap().columns.push(ColumnInfo {
78 name: col_name,
79 data_type,
80 udt_name,
81 is_nullable: nullable == "YES",
82 is_primary_key: is_pk,
83 ordinal_position: ordinal,
84 schema_name: schema,
85 column_default,
86 });
87 }
88
89 Ok(tables)
90}
91
92async fn fetch_views(pool: &PgPool, schemas: &[String]) -> Result<Vec<TableInfo>> {
93 let rows = sqlx::query_as::<_, (String, String, String, String, String, String, i32, Option<String>)>(
94 r#"
95 SELECT
96 c.table_schema,
97 c.table_name,
98 c.column_name,
99 c.data_type,
100 COALESCE(c.udt_name, c.data_type) as udt_name,
101 c.is_nullable,
102 c.ordinal_position,
103 c.column_default
104 FROM information_schema.columns c
105 JOIN information_schema.tables t
106 ON t.table_schema = c.table_schema
107 AND t.table_name = c.table_name
108 AND t.table_type = 'VIEW'
109 WHERE c.table_schema = ANY($1)
110 ORDER BY c.table_schema, c.table_name, c.ordinal_position
111 "#,
112 )
113 .bind(schemas)
114 .fetch_all(pool)
115 .await?;
116
117 let mut views: Vec<TableInfo> = Vec::new();
118 let mut current_key: Option<(String, String)> = None;
119
120 for (schema, table, col_name, data_type, udt_name, nullable, ordinal, column_default) in rows {
121 let key = (schema.clone(), table.clone());
122 if current_key.as_ref() != Some(&key) {
123 current_key = Some(key);
124 views.push(TableInfo {
125 schema_name: schema.clone(),
126 name: table.clone(),
127 columns: Vec::new(),
128 });
129 }
130 views.last_mut().unwrap().columns.push(ColumnInfo {
131 name: col_name,
132 data_type,
133 udt_name,
134 is_nullable: nullable == "YES",
135 is_primary_key: false,
136 ordinal_position: ordinal,
137 schema_name: schema,
138 column_default,
139 });
140 }
141
142 Ok(views)
143}
144
145async fn fetch_enums(pool: &PgPool, schemas: &[String]) -> Result<Vec<EnumInfo>> {
146 let rows = sqlx::query_as::<_, (String, String, String)>(
147 r#"
148 SELECT
149 n.nspname AS schema_name,
150 t.typname AS enum_name,
151 e.enumlabel AS variant
152 FROM pg_catalog.pg_type t
153 JOIN pg_catalog.pg_enum e ON e.enumtypid = t.oid
154 JOIN pg_catalog.pg_namespace n ON n.oid = t.typnamespace
155 WHERE n.nspname = ANY($1)
156 ORDER BY n.nspname, t.typname, e.enumsortorder
157 "#,
158 )
159 .bind(schemas)
160 .fetch_all(pool)
161 .await?;
162
163 let mut enums: Vec<EnumInfo> = Vec::new();
164 let mut current_key: Option<(String, String)> = None;
165
166 for (schema, name, variant) in rows {
167 let key = (schema.clone(), name.clone());
168 if current_key.as_ref() != Some(&key) {
169 current_key = Some(key);
170 enums.push(EnumInfo {
171 schema_name: schema,
172 name,
173 variants: Vec::new(),
174 default_variant: None,
175 });
176 }
177 enums.last_mut().unwrap().variants.push(variant);
178 }
179
180 Ok(enums)
181}
182
183async fn fetch_composite_types(
184 pool: &PgPool,
185 schemas: &[String],
186) -> Result<Vec<CompositeTypeInfo>> {
187 let rows = sqlx::query_as::<_, (String, String, String, String, String, i32)>(
188 r#"
189 SELECT
190 n.nspname AS schema_name,
191 t.typname AS type_name,
192 a.attname AS field_name,
193 COALESCE(ft.typname, '') AS field_type,
194 CASE WHEN a.attnotnull THEN 'NO' ELSE 'YES' END AS is_nullable,
195 a.attnum AS ordinal
196 FROM pg_catalog.pg_type t
197 JOIN pg_catalog.pg_namespace n ON n.oid = t.typnamespace
198 JOIN pg_catalog.pg_class c ON c.oid = t.typrelid
199 JOIN pg_catalog.pg_attribute a ON a.attrelid = c.oid AND a.attnum > 0 AND NOT a.attisdropped
200 JOIN pg_catalog.pg_type ft ON ft.oid = a.atttypid
201 WHERE t.typtype = 'c'
202 AND n.nspname = ANY($1)
203 AND NOT EXISTS (
204 SELECT 1 FROM information_schema.tables it
205 WHERE it.table_schema = n.nspname AND it.table_name = t.typname
206 )
207 ORDER BY n.nspname, t.typname, a.attnum
208 "#,
209 )
210 .bind(schemas)
211 .fetch_all(pool)
212 .await?;
213
214 let mut composites: Vec<CompositeTypeInfo> = Vec::new();
215 let mut current_key: Option<(String, String)> = None;
216
217 for (schema, type_name, field_name, field_type, nullable, ordinal) in rows {
218 let key = (schema.clone(), type_name.clone());
219 if current_key.as_ref() != Some(&key) {
220 current_key = Some(key);
221 composites.push(CompositeTypeInfo {
222 schema_name: schema.clone(),
223 name: type_name,
224 fields: Vec::new(),
225 });
226 }
227 composites.last_mut().unwrap().fields.push(ColumnInfo {
228 name: field_name,
229 data_type: field_type.clone(),
230 udt_name: field_type,
231 is_nullable: nullable == "YES",
232 is_primary_key: false,
233 ordinal_position: ordinal,
234 schema_name: schema,
235 column_default: None,
236 });
237 }
238
239 Ok(composites)
240}
241
242async fn fetch_domains(pool: &PgPool, schemas: &[String]) -> Result<Vec<DomainInfo>> {
243 let rows = sqlx::query_as::<_, (String, String, String)>(
244 r#"
245 SELECT
246 n.nspname AS schema_name,
247 t.typname AS domain_name,
248 bt.typname AS base_type
249 FROM pg_catalog.pg_type t
250 JOIN pg_catalog.pg_namespace n ON n.oid = t.typnamespace
251 JOIN pg_catalog.pg_type bt ON bt.oid = t.typbasetype
252 WHERE t.typtype = 'd'
253 AND n.nspname = ANY($1)
254 ORDER BY n.nspname, t.typname
255 "#,
256 )
257 .bind(schemas)
258 .fetch_all(pool)
259 .await?;
260
261 Ok(rows
262 .into_iter()
263 .map(|(schema, name, base_type)| DomainInfo {
264 schema_name: schema,
265 name,
266 base_type,
267 })
268 .collect())
269}