axion_db/introspection/
postgres.rs

1// axion-db/src/introspection/postgres.rs
2use crate::{
3    client::DbClient,
4    error::{DbError, DbResult},
5    introspection::Introspector,
6    metadata::*,
7    types::{TypeMapper, postgres::PostgresTypeMapper},
8};
9use sqlx::FromRow;
10use std::{collections::HashMap, sync::Arc};
11use tracing::{info, instrument, warn};
12
13// =================================================================================
14//  1. FromRow Structs (Unchanged)
15// =================================================================================
16#[derive(Debug, FromRow)]
17struct TableAndViewRow {
18    table_name: String,
19    table_type: String,
20}
21
22#[derive(Debug, FromRow, Clone)]
23struct ColumnIntrospectionRow {
24    column_name: String,
25    data_type: String,
26    udt_name: String,
27    is_nullable: String,
28    column_default: Option<String>,
29    column_comment: Option<String>,
30    is_primary_key: bool,
31}
32
33#[derive(Debug, FromRow)]
34struct ForeignKeyIntrospectionRow {
35    column_name: String,
36    foreign_table_schema: String,
37    foreign_table_name: String,
38    foreign_column_name: String,
39}
40
41#[derive(Debug, FromRow)]
42struct EnumIntrospectionRow {
43    enum_name: String,
44    enum_value: String,
45}
46
47// =================================================================================
48//  2. The Introspector Implementation
49// =================================================================================
50
51pub struct PostgresIntrospector {
52    client: Arc<DbClient>,
53    type_mapper: PostgresTypeMapper,
54}
55
56impl PostgresIntrospector {
57    pub fn new(client: Arc<DbClient>) -> Self {
58        Self {
59            client,
60            type_mapper: PostgresTypeMapper,
61        }
62    }
63
64    // --- Helper Methods using our validated queries ---
65
66    #[instrument(skip(self), name = "list_db_entities")]
67    async fn list_tables_and_views(&self, schema_name: &str) -> DbResult<Vec<TableAndViewRow>> {
68        let query = "
69            SELECT
70                table_name::TEXT,
71                table_type::TEXT
72            FROM information_schema.tables
73            WHERE table_schema = $1
74            ORDER BY table_type, table_name;
75        ";
76        sqlx::query_as(query)
77            .bind(schema_name)
78            .fetch_all(&*self.client.pool)
79            .await
80            .map_err(DbError::from)
81    }
82
83    // (get_foreign_keys_for_table remains unchanged)
84    #[instrument(skip(self), name = "get_foreign_keys")]
85    async fn get_foreign_keys_for_table(
86        &self,
87        schema_name: &str,
88        table_name: &str,
89    ) -> DbResult<HashMap<String, ForeignKeyReference>> {
90        let query = r#"
91            SELECT
92                kcu.column_name::TEXT,
93                ccu.table_schema::TEXT AS foreign_table_schema,
94                ccu.table_name::TEXT AS foreign_table_name,
95                ccu.column_name::TEXT AS foreign_column_name
96            FROM information_schema.table_constraints AS tc
97            JOIN information_schema.key_column_usage AS kcu
98                ON tc.constraint_name = kcu.constraint_name AND tc.constraint_schema = kcu.constraint_schema
99            JOIN information_schema.constraint_column_usage AS ccu
100                ON ccu.constraint_name = tc.constraint_name AND ccu.constraint_schema = tc.constraint_schema
101            WHERE tc.constraint_type = 'FOREIGN KEY'
102            AND tc.table_schema = $1
103            AND tc.table_name = $2
104        "#;
105        let rows: Vec<ForeignKeyIntrospectionRow> = sqlx::query_as(query)
106            .bind(schema_name)
107            .bind(table_name)
108            .fetch_all(&*self.client.pool)
109            .await?;
110        Ok(rows
111            .into_iter()
112            .map(|row| {
113                (
114                    row.column_name,
115                    ForeignKeyReference {
116                        schema: row.foreign_table_schema,
117                        table: row.foreign_table_name,
118                        column: row.foreign_column_name,
119                    },
120                )
121            })
122            .collect())
123    }
124}
125
126// =================================================================================
127//  3. The Main Introspector Trait Implementation (Now with View/Enum Logic)
128// =================================================================================
129
130#[async_trait::async_trait]
131impl Introspector for PostgresIntrospector {
132    #[instrument(skip(self), name = "introspect_database")]
133    async fn introspect(&self, schemas: &[String]) -> DbResult<DatabaseMetadata> {
134        info!(
135            "Starting full database introspection for schemas: {:?}",
136            schemas
137        );
138        let mut db_meta = DatabaseMetadata::default();
139        for schema_name in schemas {
140            match self.introspect_schema(schema_name).await {
141                Ok(schema_meta) => {
142                    db_meta.schemas.insert(schema_name.clone(), schema_meta);
143                }
144                Err(e) => warn!("Could not introspect schema '{}': {}", schema_name, e),
145            }
146        }
147        info!("Database introspection complete.");
148        Ok(db_meta)
149    }
150
151    #[instrument(skip(self), name = "introspect_schema")]
152    async fn introspect_schema(&self, schema_name: &str) -> DbResult<SchemaMetadata> {
153        let mut schema_meta = SchemaMetadata {
154            name: schema_name.to_string(),
155            ..Default::default()
156        };
157
158        // Fetch all entities and enums for the schema concurrently
159        let (entities_result, enums_result) = tokio::join!(
160            self.list_tables_and_views(schema_name),
161            self.introspect_enums_for_schema(schema_name)
162        );
163
164        schema_meta.enums = enums_result?;
165
166        for entity in entities_result? {
167            if entity.table_type == "BASE TABLE" {
168                match self.introspect_table(schema_name, &entity.table_name).await {
169                    Ok(table_md) => {
170                        schema_meta.tables.insert(entity.table_name, table_md);
171                    }
172                    Err(e) => warn!(
173                        "Skipping table {}.{}: {}",
174                        schema_name, entity.table_name, e
175                    ),
176                }
177            } else if entity.table_type == "VIEW" {
178                match self.introspect_view(schema_name, &entity.table_name).await {
179                    Ok(view_md) => {
180                        schema_meta.views.insert(entity.table_name, view_md);
181                    }
182                    Err(e) => warn!("Skipping view {}.{}: {}", schema_name, entity.table_name, e),
183                }
184            }
185        }
186
187        Ok(schema_meta)
188    }
189
190    #[instrument(skip(self, table_name), name = "introspect_table")]
191    async fn introspect_table(
192        &self,
193        schema_name: &str,
194        table_name: &str,
195    ) -> DbResult<TableMetadata> {
196        let columns_query = r#"
197            SELECT
198                c.column_name::TEXT,
199                c.data_type::TEXT,
200                c.udt_name::TEXT,
201                c.is_nullable::TEXT,
202                c.column_default,
203                pg_catalog.col_description((quote_ident(c.table_schema) || '.' || quote_ident(c.table_name))::regclass::oid, c.ordinal_position) AS column_comment,
204                EXISTS (
205                    SELECT 1 FROM information_schema.table_constraints tc
206                    JOIN information_schema.key_column_usage kcu ON tc.constraint_name = kcu.constraint_name AND tc.constraint_schema = kcu.constraint_schema
207                    WHERE tc.table_schema = c.table_schema AND tc.table_name = c.table_name AND kcu.column_name = c.column_name AND tc.constraint_type = 'PRIMARY KEY'
208                ) AS is_primary_key
209            FROM information_schema.columns c
210            WHERE c.table_schema = $1 AND c.table_name = $2
211            ORDER BY c.ordinal_position;
212        "#;
213
214        let (columns_result, fks_result) = tokio::join!(
215            sqlx::query_as::<_, ColumnIntrospectionRow>(columns_query)
216                .bind(schema_name)
217                .bind(table_name)
218                .fetch_all(&*self.client.pool),
219            self.get_foreign_keys_for_table(schema_name, table_name)
220        );
221
222        let column_rows = columns_result?;
223        let foreign_keys = fks_result?;
224
225        if column_rows.is_empty() {
226            return Err(DbError::Introspection(format!(
227                "Table {}.{} not found or has no columns",
228                schema_name, table_name
229            )));
230        }
231
232        let mut columns = Vec::new();
233        let mut primary_key_columns = Vec::new();
234
235        for row in column_rows {
236            if row.is_primary_key {
237                primary_key_columns.push(row.column_name.clone());
238            }
239            let foreign_key = foreign_keys.get(&row.column_name).cloned();
240
241            columns.push(ColumnMetadata {
242                name: row.column_name,
243                sql_type_name: row.data_type.clone(),
244                axion_type: self
245                    .type_mapper
246                    .sql_to_axion(&row.data_type, Some(&row.udt_name)),
247                is_nullable: row.is_nullable.to_lowercase() == "yes",
248                is_primary_key: row.is_primary_key,
249                default_value: row.column_default,
250                comment: row.column_comment,
251                foreign_key,
252            });
253        }
254
255        Ok(TableMetadata {
256            name: table_name.to_string(),
257            schema: schema_name.to_string(),
258            columns,
259            primary_key_columns,
260            comment: None, // Table comments would require another small query
261        })
262    }
263
264    // =================================== NEW METHODS ===================================
265
266    #[instrument(skip(self, view_name), name = "introspect_view")]
267    async fn introspect_view(&self, schema_name: &str, view_name: &str) -> DbResult<ViewMetadata> {
268        let columns_query = r#"
269            SELECT
270                c.column_name::TEXT,
271                c.data_type::TEXT,
272                c.udt_name::TEXT,
273                c.is_nullable::TEXT,
274                c.column_default,
275                pg_catalog.col_description((quote_ident(c.table_schema) || '.' || quote_ident(c.table_name))::regclass::oid, c.ordinal_position) AS column_comment,
276                -- Views do not have primary keys, so this is always false.
277                false AS is_primary_key
278            FROM information_schema.columns c
279            WHERE c.table_schema = $1 AND c.table_name = $2
280            ORDER BY c.ordinal_position;
281        "#;
282
283        let definition_query = "
284            SELECT view_definition::TEXT FROM information_schema.views 
285            WHERE table_schema = $1 AND table_name = $2
286        ";
287
288        let (columns_result, definition_result) = tokio::join!(
289            sqlx::query_as::<_, ColumnIntrospectionRow>(columns_query)
290                .bind(schema_name)
291                .bind(view_name)
292                .fetch_all(&*self.client.pool),
293            sqlx::query_scalar::<_, Option<String>>(definition_query)
294                .bind(schema_name)
295                .bind(view_name)
296                .fetch_one(&*self.client.pool)
297        );
298
299        let column_rows = columns_result?;
300        let definition = definition_result?;
301
302        let columns = column_rows
303            .into_iter()
304            .map(|row| ColumnMetadata {
305                name: row.column_name,
306                sql_type_name: row.data_type.clone(),
307                axion_type: self
308                    .type_mapper
309                    .sql_to_axion(&row.data_type, Some(&row.udt_name)),
310                is_nullable: row.is_nullable.to_lowercase() == "yes",
311                is_primary_key: false, // Views do not have primary keys
312                default_value: row.column_default,
313                comment: row.column_comment,
314                foreign_key: None, // Views do not have foreign keys
315            })
316            .collect();
317
318        Ok(ViewMetadata {
319            name: view_name.to_string(),
320            schema: schema_name.to_string(),
321            columns,
322            definition,
323            comment: None, // View comments would require another query
324        })
325    }
326
327    #[instrument(skip(self), name = "introspect_schema_enums")]
328    async fn introspect_enums_for_schema(
329        &self,
330        schema_name: &str,
331    ) -> DbResult<HashMap<String, EnumMetadata>> {
332        let query = "
333            SELECT
334                t.typname::TEXT AS enum_name,
335                e.enumlabel::TEXT AS enum_value
336            FROM pg_catalog.pg_type t
337            JOIN pg_catalog.pg_namespace n ON n.oid = t.typnamespace
338            JOIN pg_catalog.pg_enum e ON t.oid = e.enumtypid
339            WHERE n.nspname = $1 AND t.typtype = 'e'
340            ORDER BY enum_name, e.enumsortorder;
341        ";
342
343        let rows: Vec<EnumIntrospectionRow> = sqlx::query_as(query)
344            .bind(schema_name)
345            .fetch_all(&*self.client.pool)
346            .await?;
347
348        let mut enums = HashMap::new();
349        for row in rows {
350            enums
351                .entry(row.enum_name.clone())
352                .or_insert_with(|| EnumMetadata {
353                    name: row.enum_name,
354                    schema: schema_name.to_string(),
355                    ..Default::default()
356                })
357                .values
358                .push(row.enum_value);
359        }
360        Ok(enums)
361    }
362
363    // Add this method inside `impl PostgresIntrospector`
364    #[instrument(skip(self), name = "list_user_schemas")]
365    async fn list_user_schemas(&self) -> DbResult<Vec<String>> {
366        let query = "
367        SELECT nspname::TEXT AS schema_name
368        FROM pg_catalog.pg_namespace
369        WHERE nspname NOT IN ('information_schema', 'pg_catalog', 'pg_toast')
370          AND nspname NOT LIKE 'pg_temp_%'
371        ORDER BY schema_name;
372    ";
373        let rows: Vec<(String,)> = sqlx::query_as(query).fetch_all(&*self.client.pool).await?;
374        Ok(rows.into_iter().map(|r| r.0).collect())
375    }
376}