1use crate::error::Result;
2use sqlx::PgPool;
3
4use super::{ColumnInfo, CompositeTypeInfo, DomainInfo, EnumInfo, SchemaInfo, TableInfo};
5
6pub async fn introspect(
7 pool: &PgPool,
8 schemas: &[String],
9 include_views: bool,
10) -> Result<SchemaInfo> {
11 let tables = fetch_tables(pool, schemas).await?;
12 let views = if include_views {
13 fetch_views(pool, schemas).await?
14 } else {
15 Vec::new()
16 };
17 let enums = fetch_enums(pool, schemas).await?;
18 let composite_types = fetch_composite_types(pool, schemas).await?;
19 let domains = fetch_domains(pool, schemas).await?;
20
21 Ok(SchemaInfo {
22 tables,
23 views,
24 enums,
25 composite_types,
26 domains,
27 })
28}
29
30async fn fetch_tables(pool: &PgPool, schemas: &[String]) -> Result<Vec<TableInfo>> {
31 let rows = sqlx::query_as::<_, (String, String, String, String, String, String, i32, bool)>(
32 r#"
33 SELECT
34 c.table_schema,
35 c.table_name,
36 c.column_name,
37 c.data_type,
38 COALESCE(c.udt_name, c.data_type) as udt_name,
39 c.is_nullable,
40 c.ordinal_position,
41 CASE WHEN kcu.column_name IS NOT NULL THEN true ELSE false END AS is_primary_key
42 FROM information_schema.columns c
43 JOIN information_schema.tables t
44 ON t.table_schema = c.table_schema
45 AND t.table_name = c.table_name
46 AND t.table_type = 'BASE TABLE'
47 LEFT JOIN information_schema.table_constraints tc
48 ON tc.table_schema = c.table_schema
49 AND tc.table_name = c.table_name
50 AND tc.constraint_type = 'PRIMARY KEY'
51 LEFT JOIN information_schema.key_column_usage kcu
52 ON kcu.constraint_name = tc.constraint_name
53 AND kcu.constraint_schema = tc.constraint_schema
54 AND kcu.column_name = c.column_name
55 WHERE c.table_schema = ANY($1)
56 ORDER BY c.table_schema, c.table_name, c.ordinal_position
57 "#,
58 )
59 .bind(schemas)
60 .fetch_all(pool)
61 .await?;
62
63 let mut tables: Vec<TableInfo> = Vec::new();
64 let mut current_key: Option<(String, String)> = None;
65
66 for (schema, table, col_name, data_type, udt_name, nullable, ordinal, is_pk) in rows {
67 let key = (schema.clone(), table.clone());
68 if current_key.as_ref() != Some(&key) {
69 current_key = Some(key);
70 tables.push(TableInfo {
71 schema_name: schema.clone(),
72 name: table.clone(),
73 columns: Vec::new(),
74 });
75 }
76 tables.last_mut().unwrap().columns.push(ColumnInfo {
77 name: col_name,
78 data_type,
79 udt_name,
80 is_nullable: nullable == "YES",
81 is_primary_key: is_pk,
82 ordinal_position: ordinal,
83 schema_name: schema,
84 });
85 }
86
87 Ok(tables)
88}
89
90async fn fetch_views(pool: &PgPool, schemas: &[String]) -> Result<Vec<TableInfo>> {
91 let rows = sqlx::query_as::<_, (String, String, String, String, String, String, i32)>(
92 r#"
93 SELECT
94 c.table_schema,
95 c.table_name,
96 c.column_name,
97 c.data_type,
98 COALESCE(c.udt_name, c.data_type) as udt_name,
99 c.is_nullable,
100 c.ordinal_position
101 FROM information_schema.columns c
102 JOIN information_schema.tables t
103 ON t.table_schema = c.table_schema
104 AND t.table_name = c.table_name
105 AND t.table_type = 'VIEW'
106 WHERE c.table_schema = ANY($1)
107 ORDER BY c.table_schema, c.table_name, c.ordinal_position
108 "#,
109 )
110 .bind(schemas)
111 .fetch_all(pool)
112 .await?;
113
114 let mut views: Vec<TableInfo> = Vec::new();
115 let mut current_key: Option<(String, String)> = None;
116
117 for (schema, table, col_name, data_type, udt_name, nullable, ordinal) in rows {
118 let key = (schema.clone(), table.clone());
119 if current_key.as_ref() != Some(&key) {
120 current_key = Some(key);
121 views.push(TableInfo {
122 schema_name: schema.clone(),
123 name: table.clone(),
124 columns: Vec::new(),
125 });
126 }
127 views.last_mut().unwrap().columns.push(ColumnInfo {
128 name: col_name,
129 data_type,
130 udt_name,
131 is_nullable: nullable == "YES",
132 is_primary_key: false,
133 ordinal_position: ordinal,
134 schema_name: schema,
135 });
136 }
137
138 Ok(views)
139}
140
141async fn fetch_enums(pool: &PgPool, schemas: &[String]) -> Result<Vec<EnumInfo>> {
142 let rows = sqlx::query_as::<_, (String, String, String)>(
143 r#"
144 SELECT
145 n.nspname AS schema_name,
146 t.typname AS enum_name,
147 e.enumlabel AS variant
148 FROM pg_catalog.pg_type t
149 JOIN pg_catalog.pg_enum e ON e.enumtypid = t.oid
150 JOIN pg_catalog.pg_namespace n ON n.oid = t.typnamespace
151 WHERE n.nspname = ANY($1)
152 ORDER BY n.nspname, t.typname, e.enumsortorder
153 "#,
154 )
155 .bind(schemas)
156 .fetch_all(pool)
157 .await?;
158
159 let mut enums: Vec<EnumInfo> = Vec::new();
160 let mut current_key: Option<(String, String)> = None;
161
162 for (schema, name, variant) in rows {
163 let key = (schema.clone(), name.clone());
164 if current_key.as_ref() != Some(&key) {
165 current_key = Some(key);
166 enums.push(EnumInfo {
167 schema_name: schema,
168 name,
169 variants: Vec::new(),
170 });
171 }
172 enums.last_mut().unwrap().variants.push(variant);
173 }
174
175 Ok(enums)
176}
177
178async fn fetch_composite_types(
179 pool: &PgPool,
180 schemas: &[String],
181) -> Result<Vec<CompositeTypeInfo>> {
182 let rows = sqlx::query_as::<_, (String, String, String, String, String, i32)>(
183 r#"
184 SELECT
185 n.nspname AS schema_name,
186 t.typname AS type_name,
187 a.attname AS field_name,
188 COALESCE(ft.typname, '') AS field_type,
189 CASE WHEN a.attnotnull THEN 'NO' ELSE 'YES' END AS is_nullable,
190 a.attnum AS ordinal
191 FROM pg_catalog.pg_type t
192 JOIN pg_catalog.pg_namespace n ON n.oid = t.typnamespace
193 JOIN pg_catalog.pg_class c ON c.oid = t.typrelid
194 JOIN pg_catalog.pg_attribute a ON a.attrelid = c.oid AND a.attnum > 0 AND NOT a.attisdropped
195 JOIN pg_catalog.pg_type ft ON ft.oid = a.atttypid
196 WHERE t.typtype = 'c'
197 AND n.nspname = ANY($1)
198 AND NOT EXISTS (
199 SELECT 1 FROM information_schema.tables it
200 WHERE it.table_schema = n.nspname AND it.table_name = t.typname
201 )
202 ORDER BY n.nspname, t.typname, a.attnum
203 "#,
204 )
205 .bind(schemas)
206 .fetch_all(pool)
207 .await?;
208
209 let mut composites: Vec<CompositeTypeInfo> = Vec::new();
210 let mut current_key: Option<(String, String)> = None;
211
212 for (schema, type_name, field_name, field_type, nullable, ordinal) in rows {
213 let key = (schema.clone(), type_name.clone());
214 if current_key.as_ref() != Some(&key) {
215 current_key = Some(key);
216 composites.push(CompositeTypeInfo {
217 schema_name: schema.clone(),
218 name: type_name,
219 fields: Vec::new(),
220 });
221 }
222 composites.last_mut().unwrap().fields.push(ColumnInfo {
223 name: field_name,
224 data_type: field_type.clone(),
225 udt_name: field_type,
226 is_nullable: nullable == "YES",
227 is_primary_key: false,
228 ordinal_position: ordinal,
229 schema_name: schema,
230 });
231 }
232
233 Ok(composites)
234}
235
236async fn fetch_domains(pool: &PgPool, schemas: &[String]) -> Result<Vec<DomainInfo>> {
237 let rows = sqlx::query_as::<_, (String, String, String)>(
238 r#"
239 SELECT
240 n.nspname AS schema_name,
241 t.typname AS domain_name,
242 bt.typname AS base_type
243 FROM pg_catalog.pg_type t
244 JOIN pg_catalog.pg_namespace n ON n.oid = t.typnamespace
245 JOIN pg_catalog.pg_type bt ON bt.oid = t.typbasetype
246 WHERE t.typtype = 'd'
247 AND n.nspname = ANY($1)
248 ORDER BY n.nspname, t.typname
249 "#,
250 )
251 .bind(schemas)
252 .fetch_all(pool)
253 .await?;
254
255 Ok(rows
256 .into_iter()
257 .map(|(schema, name, base_type)| DomainInfo {
258 schema_name: schema,
259 name,
260 base_type,
261 })
262 .collect())
263}