Skip to main content

fraiseql_db/postgres/
introspector.rs

1/// ! PostgreSQL database introspection for fact tables.
2use deadpool_postgres::Pool;
3use fraiseql_error::{FraiseQLError, Result};
4use tokio_postgres::Row;
5
6use crate::{DatabaseType, introspector::{DatabaseIntrospector, RelationInfo, RelationKind}};
7
8/// PostgreSQL introspector for fact table metadata.
9pub struct PostgresIntrospector {
10    pool: Pool,
11}
12
13impl PostgresIntrospector {
14    /// Create new PostgreSQL introspector from connection pool.
15    #[must_use]
16    pub const fn new(pool: Pool) -> Self {
17        Self { pool }
18    }
19}
20
21impl DatabaseIntrospector for PostgresIntrospector {
22    async fn list_fact_tables(&self) -> Result<Vec<String>> {
23        let client = self.pool.get().await.map_err(|e| FraiseQLError::ConnectionPool {
24            message: format!("Failed to acquire connection: {e}"),
25        })?;
26
27        // Query information_schema for tables matching tf_* pattern across all schemas
28        // in the current search path.
29        let query = r"
30            SELECT table_name
31            FROM information_schema.tables
32            WHERE table_schema = ANY(current_schemas(false))
33              AND table_type = 'BASE TABLE'
34              AND table_name LIKE 'tf_%'
35            ORDER BY table_name
36        ";
37
38        let rows: Vec<Row> =
39            client.query(query, &[]).await.map_err(|e| FraiseQLError::Database {
40                message:   format!("Failed to list fact tables: {e}"),
41                sql_state: e.code().map(|c| c.code().to_string()),
42            })?;
43
44        let tables = rows
45            .into_iter()
46            .map(|row| {
47                let name: String = row.get(0);
48                name
49            })
50            .collect();
51
52        Ok(tables)
53    }
54
55    async fn get_columns(&self, table_name: &str) -> Result<Vec<(String, String, bool)>> {
56        let client = self.pool.get().await.map_err(|e| FraiseQLError::ConnectionPool {
57            message: format!("Failed to acquire connection: {e}"),
58        })?;
59
60        // Support schema-qualified names like "benchmark.tv_post".
61        // When no schema prefix is present, search across all schemas in the current
62        // search path so that non-public schemas work without explicit qualification.
63        let rows: Vec<Row> = if let Some(dot) = table_name.find('.') {
64            let (schema, name) = (&table_name[..dot], &table_name[dot + 1..]);
65            client
66                .query(
67                    r"SELECT column_name, data_type, is_nullable = 'YES' as is_nullable
68                      FROM information_schema.columns
69                      WHERE table_name = $1 AND table_schema = $2
70                      ORDER BY ordinal_position",
71                    &[&name, &schema],
72                )
73                .await
74        } else {
75            client
76                .query(
77                    r"SELECT column_name, data_type, is_nullable = 'YES' as is_nullable
78                      FROM information_schema.columns
79                      WHERE table_name = $1
80                        AND table_schema = ANY(current_schemas(false))
81                      ORDER BY ordinal_position",
82                    &[&table_name],
83                )
84                .await
85        }
86        .map_err(|e| FraiseQLError::Database {
87            message:   format!("Failed to query column information: {e}"),
88            sql_state: e.code().map(|c| c.code().to_string()),
89        })?;
90
91        let columns = rows
92            .into_iter()
93            .map(|row| {
94                let name: String = row.get(0);
95                let data_type: String = row.get(1);
96                let is_nullable: bool = row.get(2);
97                (name, data_type, is_nullable)
98            })
99            .collect();
100
101        Ok(columns)
102    }
103
104    async fn get_indexed_columns(&self, table_name: &str) -> Result<Vec<String>> {
105        let client = self.pool.get().await.map_err(|e| FraiseQLError::ConnectionPool {
106            message: format!("Failed to acquire connection: {e}"),
107        })?;
108
109        // Support schema-qualified names like "benchmark.tv_post".
110        // When no schema prefix is present, search across all schemas in the current
111        // search path so that non-public schemas work without explicit qualification.
112        let rows: Vec<Row> = if let Some(dot) = table_name.find('.') {
113            let (schema, name) = (&table_name[..dot], &table_name[dot + 1..]);
114            client
115                .query(
116                    r"SELECT DISTINCT a.attname AS column_name
117                      FROM pg_index i
118                      JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
119                      JOIN pg_class t ON t.oid = i.indrelid
120                      JOIN pg_namespace n ON n.oid = t.relnamespace
121                      WHERE t.relname = $1 AND n.nspname = $2 AND a.attnum > 0
122                      ORDER BY column_name",
123                    &[&name, &schema],
124                )
125                .await
126        } else {
127            client
128                .query(
129                    r"SELECT DISTINCT a.attname AS column_name
130                      FROM pg_index i
131                      JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
132                      JOIN pg_class t ON t.oid = i.indrelid
133                      JOIN pg_namespace n ON n.oid = t.relnamespace
134                      WHERE t.relname = $1
135                        AND n.nspname = ANY(current_schemas(false))
136                        AND a.attnum > 0
137                      ORDER BY column_name",
138                    &[&table_name],
139                )
140                .await
141        }
142        .map_err(|e| FraiseQLError::Database {
143            message:   format!("Failed to query index information: {e}"),
144            sql_state: e.code().map(|c| c.code().to_string()),
145        })?;
146
147        let indexed_columns = rows
148            .into_iter()
149            .map(|row| {
150                let name: String = row.get(0);
151                name
152            })
153            .collect();
154
155        Ok(indexed_columns)
156    }
157
158    fn database_type(&self) -> DatabaseType {
159        DatabaseType::PostgreSQL
160    }
161
162    async fn list_relations(&self) -> Result<Vec<RelationInfo>> {
163        let client = self.pool.get().await.map_err(|e| FraiseQLError::ConnectionPool {
164            message: format!("Failed to acquire connection: {e}"),
165        })?;
166
167        // List all tables and views visible in the current search path, excluding
168        // system schemas. Uses current_schemas(false) to match the same scope as
169        // get_columns / get_indexed_columns so that relation existence checks are
170        // consistent with column lookups.
171        let rows: Vec<Row> = client
172            .query(
173                r"SELECT table_schema, table_name,
174                         CASE table_type WHEN 'VIEW' THEN 'view' ELSE 'table' END AS kind
175                  FROM information_schema.tables
176                  WHERE table_schema = ANY(current_schemas(false))
177                  ORDER BY table_schema, table_name",
178                &[],
179            )
180            .await
181            .map_err(|e| FraiseQLError::Database {
182                message:   format!("Failed to list relations: {e}"),
183                sql_state: e.code().map(|c| c.code().to_string()),
184            })?;
185
186        let relations = rows
187            .into_iter()
188            .map(|row| {
189                let schema: String = row.get(0);
190                let name: String = row.get(1);
191                let kind_str: String = row.get(2);
192                let kind = if kind_str == "view" { RelationKind::View } else { RelationKind::Table };
193                RelationInfo { schema, name, kind }
194            })
195            .collect();
196
197        Ok(relations)
198    }
199
200    async fn get_sample_jsonb(
201        &self,
202        table_name: &str,
203        column_name: &str,
204    ) -> Result<Option<serde_json::Value>> {
205        let client = self.pool.get().await.map_err(|e| FraiseQLError::ConnectionPool {
206            message: format!("Failed to acquire connection: {e}"),
207        })?;
208
209        // Query for a sample row with non-null JSON data
210        // Use format! for identifiers (safe because we validate table_name pattern)
211        let query = format!(
212            r#"
213            SELECT "{column}"::text
214            FROM "{table}"
215            WHERE "{column}" IS NOT NULL
216            LIMIT 1
217            "#,
218            table = table_name,
219            column = column_name
220        );
221
222        let rows: Vec<Row> =
223            client.query(&query, &[]).await.map_err(|e| FraiseQLError::Database {
224                message:   format!("Failed to query sample JSONB: {e}"),
225                sql_state: e.code().map(|c| c.code().to_string()),
226            })?;
227
228        if rows.is_empty() {
229            return Ok(None);
230        }
231
232        let json_text: Option<String> = rows[0].get(0);
233        if let Some(text) = json_text {
234            let value: serde_json::Value =
235                serde_json::from_str(&text).map_err(|e| FraiseQLError::Parse {
236                    message:  format!("Failed to parse JSONB sample: {e}"),
237                    location: format!("{table_name}.{column_name}"),
238                })?;
239            Ok(Some(value))
240        } else {
241            Ok(None)
242        }
243    }
244}
245
246impl PostgresIntrospector {
247    /// Get indexed columns for a view/table that match the nested path naming convention.
248    ///
249    /// This method introspects the database to find columns that follow the FraiseQL
250    /// indexed column naming conventions:
251    /// - Human-readable: `items__product__category__code` (double underscore separated)
252    /// - Entity ID format: `f{entity_id}__{field_name}` (e.g., `f200100__code`)
253    ///
254    /// These columns are created by DBAs to optimize filtering on nested GraphQL paths
255    /// by avoiding JSONB extraction at runtime.
256    ///
257    /// # Arguments
258    ///
259    /// * `view_name` - Name of the view or table to introspect
260    ///
261    /// # Returns
262    ///
263    /// Set of column names that match the indexed column naming conventions.
264    ///
265    /// # Errors
266    ///
267    /// Returns `FraiseQLError::ConnectionPool` if a connection cannot be acquired,
268    /// or `FraiseQLError::Database` if the query fails.
269    ///
270    /// # Example
271    ///
272    /// ```no_run
273    /// use fraiseql_db::postgres::PostgresIntrospector;
274    /// # use fraiseql_error::Result;
275    /// use deadpool_postgres::Pool;
276    ///
277    /// # async fn example(pool: Pool) -> Result<()> {
278    /// let introspector = PostgresIntrospector::new(pool);
279    /// let indexed_cols = introspector.get_indexed_nested_columns("v_order_items").await?;
280    /// // Returns: {"items__product__category__code", "f200100__code", ...}
281    /// # Ok(())
282    /// # }
283    /// ```
284    pub async fn get_indexed_nested_columns(
285        &self,
286        view_name: &str,
287    ) -> Result<std::collections::HashSet<String>> {
288        let client = self.pool.get().await.map_err(|e| FraiseQLError::ConnectionPool {
289            message: format!("Failed to acquire connection: {e}"),
290        })?;
291
292        // Query information_schema for columns matching __ pattern.
293        // This works for both views and tables across all schemas in the search path.
294        let query = r"
295            SELECT column_name
296            FROM information_schema.columns
297            WHERE table_name = $1
298              AND table_schema = ANY(current_schemas(false))
299              AND column_name LIKE '%__%'
300            ORDER BY column_name
301        ";
302
303        let rows: Vec<Row> =
304            client.query(query, &[&view_name]).await.map_err(|e| FraiseQLError::Database {
305                message:   format!("Failed to query view columns: {e}"),
306                sql_state: e.code().map(|c| c.code().to_string()),
307            })?;
308
309        let indexed_columns: std::collections::HashSet<String> = rows
310            .into_iter()
311            .map(|row| {
312                let name: String = row.get(0);
313                name
314            })
315            .filter(|name| {
316                // Filter to only columns that match our naming conventions:
317                // 1. Human-readable: path__to__field (at least one __ separator)
318                // 2. Entity ID: f{digits}__field_name
319                Self::is_indexed_column_name(name)
320            })
321            .collect();
322
323        Ok(indexed_columns)
324    }
325
326    /// Check if a column name matches the indexed column naming convention.
327    ///
328    /// Valid patterns:
329    /// - `items__product__category__code` (human-readable nested path)
330    /// - `f200100__code` (entity ID format)
331    fn is_indexed_column_name(name: &str) -> bool {
332        // Must contain at least one double underscore
333        if !name.contains("__") {
334            return false;
335        }
336
337        // Check for entity ID format: f{digits}__field
338        if let Some(rest) = name.strip_prefix('f') {
339            if let Some(underscore_pos) = rest.find("__") {
340                let digits = &rest[..underscore_pos];
341                if digits.chars().all(|c| c.is_ascii_digit()) && !digits.is_empty() {
342                    // Verify the field part is valid
343                    let field_part = &rest[underscore_pos + 2..];
344                    if !field_part.is_empty()
345                        && field_part.chars().all(|c| c.is_ascii_alphanumeric() || c == '_')
346                        && !field_part.starts_with(|c: char| c.is_ascii_digit())
347                    {
348                        return true;
349                    }
350                }
351            }
352        }
353
354        // Human-readable format: split by __ and check each segment is valid identifier
355        // Must have at least 2 segments, and first segment must NOT be just 'f'
356        let segments: Vec<&str> = name.split("__").collect();
357        if segments.len() < 2 {
358            return false;
359        }
360
361        // Reject if first segment is just 'f' (reserved for entity ID format)
362        if segments[0] == "f" {
363            return false;
364        }
365
366        // Each segment should be a valid identifier (alphanumeric + underscore, not starting with
367        // digit)
368        segments.iter().all(|s| {
369            !s.is_empty()
370                && s.chars().all(|c| c.is_ascii_alphanumeric() || c == '_')
371                && !s.starts_with(|c: char| c.is_ascii_digit())
372        })
373    }
374
375    /// Get all column names for a view/table.
376    ///
377    /// # Arguments
378    ///
379    /// * `view_name` - Name of the view or table to introspect
380    ///
381    /// # Returns
382    ///
383    /// List of all column names in the view/table.
384    ///
385    /// # Errors
386    ///
387    /// Returns `FraiseQLError::ConnectionPool` if a connection cannot be acquired,
388    /// or `FraiseQLError::Database` if the query fails.
389    pub async fn get_view_columns(&self, view_name: &str) -> Result<Vec<String>> {
390        let client = self.pool.get().await.map_err(|e| FraiseQLError::ConnectionPool {
391            message: format!("Failed to acquire connection: {e}"),
392        })?;
393
394        let query = r"
395            SELECT column_name
396            FROM information_schema.columns
397            WHERE table_name = $1
398              AND table_schema = ANY(current_schemas(false))
399            ORDER BY ordinal_position
400        ";
401
402        let rows: Vec<Row> =
403            client.query(query, &[&view_name]).await.map_err(|e| FraiseQLError::Database {
404                message:   format!("Failed to query view columns: {e}"),
405                sql_state: e.code().map(|c| c.code().to_string()),
406            })?;
407
408        let columns = rows
409            .into_iter()
410            .map(|row| {
411                let name: String = row.get(0);
412                name
413            })
414            .collect();
415
416        Ok(columns)
417    }
418}
419
420/// Unit tests that don't require a PostgreSQL connection.
421#[cfg(test)]
422mod unit_tests {
423    use super::*;
424
425    #[test]
426    fn test_is_indexed_column_name_human_readable() {
427        // Valid human-readable patterns
428        assert!(PostgresIntrospector::is_indexed_column_name("items__product"));
429        assert!(PostgresIntrospector::is_indexed_column_name("items__product__category"));
430        assert!(PostgresIntrospector::is_indexed_column_name("items__product__category__code"));
431        assert!(PostgresIntrospector::is_indexed_column_name("order_items__product_name"));
432
433        // Invalid patterns
434        assert!(!PostgresIntrospector::is_indexed_column_name("items"));
435        assert!(!PostgresIntrospector::is_indexed_column_name("items_product")); // single underscore
436        assert!(!PostgresIntrospector::is_indexed_column_name("__items")); // empty first segment
437        assert!(!PostgresIntrospector::is_indexed_column_name("items__")); // empty last segment
438    }
439
440    #[test]
441    fn test_is_indexed_column_name_entity_id() {
442        // Valid entity ID patterns
443        assert!(PostgresIntrospector::is_indexed_column_name("f200100__code"));
444        assert!(PostgresIntrospector::is_indexed_column_name("f1__name"));
445        assert!(PostgresIntrospector::is_indexed_column_name("f123456789__field"));
446
447        // Invalid entity ID patterns (that also aren't valid human-readable)
448        assert!(!PostgresIntrospector::is_indexed_column_name("f__code")); // no digits after 'f', and 'f' alone is reserved
449
450        // Note: fx123__code IS valid as a human-readable pattern (fx123 is a valid identifier)
451        assert!(PostgresIntrospector::is_indexed_column_name("fx123__code")); // valid as human-readable
452    }
453}
454
455/// Integration tests that require a PostgreSQL connection.
456#[cfg(all(test, feature = "test-postgres"))]
457mod integration_tests {
458    use deadpool_postgres::{Config, ManagerConfig, RecyclingMethod, Runtime};
459    use tokio_postgres::NoTls;
460
461    use super::*;
462    use crate::postgres::PostgresAdapter;
463
464    const TEST_DB_URL: &str =
465        "postgresql://fraiseql_test:fraiseql_test_password@localhost:5433/test_fraiseql";
466
467    // Helper to create test introspector
468    async fn create_test_introspector() -> PostgresIntrospector {
469        let _adapter =
470            PostgresAdapter::new(TEST_DB_URL).await.expect("Failed to create test adapter");
471
472        // Extract pool from adapter (we need a way to get the pool)
473        // For now, create a new pool directly
474
475        let mut cfg = Config::new();
476        cfg.url = Some(TEST_DB_URL.to_string());
477        cfg.manager = Some(ManagerConfig {
478            recycling_method: RecyclingMethod::Fast,
479        });
480        cfg.pool = Some(deadpool_postgres::PoolConfig::new(10));
481
482        let pool = cfg.create_pool(Some(Runtime::Tokio1), NoTls).expect("Failed to create pool");
483
484        PostgresIntrospector::new(pool)
485    }
486
487    #[tokio::test]
488    async fn test_get_columns_tf_sales() {
489        let introspector = create_test_introspector().await;
490
491        let columns = introspector.get_columns("tf_sales").await.expect("Failed to get columns");
492
493        // Should have: id, revenue, quantity, cost, discount, data, customer_id, product_id,
494        // occurred_at, created_at
495        assert!(columns.len() >= 10, "Expected at least 10 columns, got {}", columns.len());
496
497        // Check for key columns
498        let column_names: Vec<String> = columns.iter().map(|(name, _, _)| name.clone()).collect();
499        assert!(column_names.contains(&"revenue".to_string()));
500        assert!(column_names.contains(&"quantity".to_string()));
501        assert!(column_names.contains(&"data".to_string()));
502        assert!(column_names.contains(&"customer_id".to_string()));
503    }
504
505    #[tokio::test]
506    async fn test_get_indexed_columns_tf_sales() {
507        let introspector = create_test_introspector().await;
508
509        let indexed = introspector
510            .get_indexed_columns("tf_sales")
511            .await
512            .expect("Failed to get indexed columns");
513
514        // Should have indexes on: id (PK), customer_id, product_id, occurred_at, data (GIN)
515        assert!(indexed.len() >= 4, "Expected at least 4 indexed columns, got {}", indexed.len());
516
517        assert!(indexed.contains(&"customer_id".to_string()));
518        assert!(indexed.contains(&"product_id".to_string()));
519        assert!(indexed.contains(&"occurred_at".to_string()));
520    }
521
522    #[tokio::test]
523    async fn test_database_type() {
524        let introspector = create_test_introspector().await;
525        assert_eq!(introspector.database_type(), DatabaseType::PostgreSQL);
526    }
527}