1use crate::{
3 client::DbClient,
4 error::{DbError, DbResult},
5 introspection::Introspector,
6 metadata::*,
7 types::{TypeMapper, postgres::PostgresTypeMapper},
8};
9use sqlx::FromRow;
10use std::{collections::HashMap, sync::Arc};
11use tracing::{info, instrument, warn};
12
13#[derive(Debug, FromRow)]
17struct TableAndViewRow {
18 table_name: String,
19 table_type: String,
20}
21
22#[derive(Debug, FromRow, Clone)]
23struct ColumnIntrospectionRow {
24 column_name: String,
25 data_type: String,
26 udt_name: String,
27 is_nullable: String,
28 column_default: Option<String>,
29 column_comment: Option<String>,
30 is_primary_key: bool,
31}
32
33#[derive(Debug, FromRow)]
34struct ForeignKeyIntrospectionRow {
35 column_name: String,
36 foreign_table_schema: String,
37 foreign_table_name: String,
38 foreign_column_name: String,
39}
40
41#[derive(Debug, FromRow)]
42struct EnumIntrospectionRow {
43 enum_name: String,
44 enum_value: String,
45}
46
47pub struct PostgresIntrospector {
52 client: Arc<DbClient>,
53 type_mapper: PostgresTypeMapper,
54}
55
56impl PostgresIntrospector {
57 pub fn new(client: Arc<DbClient>) -> Self {
58 Self {
59 client,
60 type_mapper: PostgresTypeMapper,
61 }
62 }
63
64 #[instrument(skip(self), name = "list_db_entities")]
67 async fn list_tables_and_views(&self, schema_name: &str) -> DbResult<Vec<TableAndViewRow>> {
68 let query = "
69 SELECT
70 table_name::TEXT,
71 table_type::TEXT
72 FROM information_schema.tables
73 WHERE table_schema = $1
74 ORDER BY table_type, table_name;
75 ";
76 sqlx::query_as(query)
77 .bind(schema_name)
78 .fetch_all(&*self.client.pool)
79 .await
80 .map_err(DbError::from)
81 }
82
83 #[instrument(skip(self), name = "get_foreign_keys")]
85 async fn get_foreign_keys_for_table(
86 &self,
87 schema_name: &str,
88 table_name: &str,
89 ) -> DbResult<HashMap<String, ForeignKeyReference>> {
90 let query = r#"
91 SELECT
92 kcu.column_name::TEXT,
93 ccu.table_schema::TEXT AS foreign_table_schema,
94 ccu.table_name::TEXT AS foreign_table_name,
95 ccu.column_name::TEXT AS foreign_column_name
96 FROM information_schema.table_constraints AS tc
97 JOIN information_schema.key_column_usage AS kcu
98 ON tc.constraint_name = kcu.constraint_name AND tc.constraint_schema = kcu.constraint_schema
99 JOIN information_schema.constraint_column_usage AS ccu
100 ON ccu.constraint_name = tc.constraint_name AND ccu.constraint_schema = tc.constraint_schema
101 WHERE tc.constraint_type = 'FOREIGN KEY'
102 AND tc.table_schema = $1
103 AND tc.table_name = $2
104 "#;
105 let rows: Vec<ForeignKeyIntrospectionRow> = sqlx::query_as(query)
106 .bind(schema_name)
107 .bind(table_name)
108 .fetch_all(&*self.client.pool)
109 .await?;
110 Ok(rows
111 .into_iter()
112 .map(|row| {
113 (
114 row.column_name,
115 ForeignKeyReference {
116 schema: row.foreign_table_schema,
117 table: row.foreign_table_name,
118 column: row.foreign_column_name,
119 },
120 )
121 })
122 .collect())
123 }
124}
125
126#[async_trait::async_trait]
131impl Introspector for PostgresIntrospector {
132 #[instrument(skip(self), name = "introspect_database")]
133 async fn introspect(&self, schemas: &[String]) -> DbResult<DatabaseMetadata> {
134 info!(
135 "Starting full database introspection for schemas: {:?}",
136 schemas
137 );
138 let mut db_meta = DatabaseMetadata::default();
139 for schema_name in schemas {
140 match self.introspect_schema(schema_name).await {
141 Ok(schema_meta) => {
142 db_meta.schemas.insert(schema_name.clone(), schema_meta);
143 }
144 Err(e) => warn!("Could not introspect schema '{}': {}", schema_name, e),
145 }
146 }
147 info!("Database introspection complete.");
148 Ok(db_meta)
149 }
150
151 #[instrument(skip(self), name = "introspect_schema")]
152 async fn introspect_schema(&self, schema_name: &str) -> DbResult<SchemaMetadata> {
153 let mut schema_meta = SchemaMetadata {
154 name: schema_name.to_string(),
155 ..Default::default()
156 };
157
158 let (entities_result, enums_result) = tokio::join!(
160 self.list_tables_and_views(schema_name),
161 self.introspect_enums_for_schema(schema_name)
162 );
163
164 schema_meta.enums = enums_result?;
165
166 for entity in entities_result? {
167 if entity.table_type == "BASE TABLE" {
168 match self.introspect_table(schema_name, &entity.table_name).await {
169 Ok(table_md) => {
170 schema_meta.tables.insert(entity.table_name, table_md);
171 }
172 Err(e) => warn!(
173 "Skipping table {}.{}: {}",
174 schema_name, entity.table_name, e
175 ),
176 }
177 } else if entity.table_type == "VIEW" {
178 match self.introspect_view(schema_name, &entity.table_name).await {
179 Ok(view_md) => {
180 schema_meta.views.insert(entity.table_name, view_md);
181 }
182 Err(e) => warn!("Skipping view {}.{}: {}", schema_name, entity.table_name, e),
183 }
184 }
185 }
186
187 Ok(schema_meta)
188 }
189
190 #[instrument(skip(self, table_name), name = "introspect_table")]
191 async fn introspect_table(
192 &self,
193 schema_name: &str,
194 table_name: &str,
195 ) -> DbResult<TableMetadata> {
196 let columns_query = r#"
197 SELECT
198 c.column_name::TEXT,
199 c.data_type::TEXT,
200 c.udt_name::TEXT,
201 c.is_nullable::TEXT,
202 c.column_default,
203 pg_catalog.col_description((quote_ident(c.table_schema) || '.' || quote_ident(c.table_name))::regclass::oid, c.ordinal_position) AS column_comment,
204 EXISTS (
205 SELECT 1 FROM information_schema.table_constraints tc
206 JOIN information_schema.key_column_usage kcu ON tc.constraint_name = kcu.constraint_name AND tc.constraint_schema = kcu.constraint_schema
207 WHERE tc.table_schema = c.table_schema AND tc.table_name = c.table_name AND kcu.column_name = c.column_name AND tc.constraint_type = 'PRIMARY KEY'
208 ) AS is_primary_key
209 FROM information_schema.columns c
210 WHERE c.table_schema = $1 AND c.table_name = $2
211 ORDER BY c.ordinal_position;
212 "#;
213
214 let (columns_result, fks_result) = tokio::join!(
215 sqlx::query_as::<_, ColumnIntrospectionRow>(columns_query)
216 .bind(schema_name)
217 .bind(table_name)
218 .fetch_all(&*self.client.pool),
219 self.get_foreign_keys_for_table(schema_name, table_name)
220 );
221
222 let column_rows = columns_result?;
223 let foreign_keys = fks_result?;
224
225 if column_rows.is_empty() {
226 return Err(DbError::Introspection(format!(
227 "Table {}.{} not found or has no columns",
228 schema_name, table_name
229 )));
230 }
231
232 let mut columns = Vec::new();
233 let mut primary_key_columns = Vec::new();
234
235 for row in column_rows {
236 if row.is_primary_key {
237 primary_key_columns.push(row.column_name.clone());
238 }
239 let foreign_key = foreign_keys.get(&row.column_name).cloned();
240
241 columns.push(ColumnMetadata {
242 name: row.column_name,
243 sql_type_name: row.data_type.clone(),
244 axion_type: self
245 .type_mapper
246 .sql_to_axion(&row.data_type, Some(&row.udt_name)),
247 is_nullable: row.is_nullable.to_lowercase() == "yes",
248 is_primary_key: row.is_primary_key,
249 default_value: row.column_default,
250 comment: row.column_comment,
251 foreign_key,
252 });
253 }
254
255 Ok(TableMetadata {
256 name: table_name.to_string(),
257 schema: schema_name.to_string(),
258 columns,
259 primary_key_columns,
260 comment: None, })
262 }
263
264 #[instrument(skip(self, view_name), name = "introspect_view")]
267 async fn introspect_view(&self, schema_name: &str, view_name: &str) -> DbResult<ViewMetadata> {
268 let columns_query = r#"
269 SELECT
270 c.column_name::TEXT,
271 c.data_type::TEXT,
272 c.udt_name::TEXT,
273 c.is_nullable::TEXT,
274 c.column_default,
275 pg_catalog.col_description((quote_ident(c.table_schema) || '.' || quote_ident(c.table_name))::regclass::oid, c.ordinal_position) AS column_comment,
276 -- Views do not have primary keys, so this is always false.
277 false AS is_primary_key
278 FROM information_schema.columns c
279 WHERE c.table_schema = $1 AND c.table_name = $2
280 ORDER BY c.ordinal_position;
281 "#;
282
283 let definition_query = "
284 SELECT view_definition::TEXT FROM information_schema.views
285 WHERE table_schema = $1 AND table_name = $2
286 ";
287
288 let (columns_result, definition_result) = tokio::join!(
289 sqlx::query_as::<_, ColumnIntrospectionRow>(columns_query)
290 .bind(schema_name)
291 .bind(view_name)
292 .fetch_all(&*self.client.pool),
293 sqlx::query_scalar::<_, Option<String>>(definition_query)
294 .bind(schema_name)
295 .bind(view_name)
296 .fetch_one(&*self.client.pool)
297 );
298
299 let column_rows = columns_result?;
300 let definition = definition_result?;
301
302 let columns = column_rows
303 .into_iter()
304 .map(|row| ColumnMetadata {
305 name: row.column_name,
306 sql_type_name: row.data_type.clone(),
307 axion_type: self
308 .type_mapper
309 .sql_to_axion(&row.data_type, Some(&row.udt_name)),
310 is_nullable: row.is_nullable.to_lowercase() == "yes",
311 is_primary_key: false, default_value: row.column_default,
313 comment: row.column_comment,
314 foreign_key: None, })
316 .collect();
317
318 Ok(ViewMetadata {
319 name: view_name.to_string(),
320 schema: schema_name.to_string(),
321 columns,
322 definition,
323 comment: None, })
325 }
326
327 #[instrument(skip(self), name = "introspect_schema_enums")]
328 async fn introspect_enums_for_schema(
329 &self,
330 schema_name: &str,
331 ) -> DbResult<HashMap<String, EnumMetadata>> {
332 let query = "
333 SELECT
334 t.typname::TEXT AS enum_name,
335 e.enumlabel::TEXT AS enum_value
336 FROM pg_catalog.pg_type t
337 JOIN pg_catalog.pg_namespace n ON n.oid = t.typnamespace
338 JOIN pg_catalog.pg_enum e ON t.oid = e.enumtypid
339 WHERE n.nspname = $1 AND t.typtype = 'e'
340 ORDER BY enum_name, e.enumsortorder;
341 ";
342
343 let rows: Vec<EnumIntrospectionRow> = sqlx::query_as(query)
344 .bind(schema_name)
345 .fetch_all(&*self.client.pool)
346 .await?;
347
348 let mut enums = HashMap::new();
349 for row in rows {
350 enums
351 .entry(row.enum_name.clone())
352 .or_insert_with(|| EnumMetadata {
353 name: row.enum_name,
354 schema: schema_name.to_string(),
355 ..Default::default()
356 })
357 .values
358 .push(row.enum_value);
359 }
360 Ok(enums)
361 }
362
363 #[instrument(skip(self), name = "list_user_schemas")]
365 async fn list_user_schemas(&self) -> DbResult<Vec<String>> {
366 let query = "
367 SELECT nspname::TEXT AS schema_name
368 FROM pg_catalog.pg_namespace
369 WHERE nspname NOT IN ('information_schema', 'pg_catalog', 'pg_toast')
370 AND nspname NOT LIKE 'pg_temp_%'
371 ORDER BY schema_name;
372 ";
373 let rows: Vec<(String,)> = sqlx::query_as(query).fetch_all(&*self.client.pool).await?;
374 Ok(rows.into_iter().map(|r| r.0).collect())
375 }
376}