Skip to main content

fraiseql_db/postgres/
introspector.rs

1/// ! PostgreSQL database introspection for fact tables.
2use deadpool_postgres::Pool;
3use fraiseql_error::{FraiseQLError, Result};
4use tokio_postgres::Row;
5
6use crate::{
7    DatabaseType,
8    introspector::{DatabaseIntrospector, RelationInfo, RelationKind},
9};
10
11/// PostgreSQL introspector for fact table metadata.
12pub struct PostgresIntrospector {
13    pool: Pool,
14}
15
16impl PostgresIntrospector {
17    /// Create new PostgreSQL introspector from connection pool.
18    #[must_use]
19    pub const fn new(pool: Pool) -> Self {
20        Self { pool }
21    }
22}
23
24impl DatabaseIntrospector for PostgresIntrospector {
25    async fn list_fact_tables(&self) -> Result<Vec<String>> {
26        let client = self.pool.get().await.map_err(|e| FraiseQLError::ConnectionPool {
27            message: format!("Failed to acquire connection: {e}"),
28        })?;
29
30        // Query information_schema for tables matching tf_* pattern across all schemas
31        // in the current search path.
32        let query = r"
33            SELECT table_name
34            FROM information_schema.tables
35            WHERE table_schema = ANY(current_schemas(false))
36              AND table_type = 'BASE TABLE'
37              AND table_name LIKE 'tf_%'
38            ORDER BY table_name
39        ";
40
41        let rows: Vec<Row> =
42            client.query(query, &[]).await.map_err(|e| FraiseQLError::Database {
43                message:   format!("Failed to list fact tables: {e}"),
44                sql_state: e.code().map(|c| c.code().to_string()),
45            })?;
46
47        let tables = rows
48            .into_iter()
49            .map(|row| {
50                let name: String = row.get(0);
51                name
52            })
53            .collect();
54
55        Ok(tables)
56    }
57
58    async fn get_columns(&self, table_name: &str) -> Result<Vec<(String, String, bool)>> {
59        let client = self.pool.get().await.map_err(|e| FraiseQLError::ConnectionPool {
60            message: format!("Failed to acquire connection: {e}"),
61        })?;
62
63        // Support schema-qualified names like "benchmark.tv_post".
64        // When no schema prefix is present, search across all schemas in the current
65        // search path so that non-public schemas work without explicit qualification.
66        let rows: Vec<Row> = if let Some(dot) = table_name.find('.') {
67            let (schema, name) = (&table_name[..dot], &table_name[dot + 1..]);
68            client
69                .query(
70                    r"SELECT column_name, data_type, is_nullable = 'YES' as is_nullable
71                      FROM information_schema.columns
72                      WHERE table_name = $1 AND table_schema = $2
73                      ORDER BY ordinal_position",
74                    &[&name, &schema],
75                )
76                .await
77        } else {
78            client
79                .query(
80                    r"SELECT column_name, data_type, is_nullable = 'YES' as is_nullable
81                      FROM information_schema.columns
82                      WHERE table_name = $1
83                        AND table_schema = ANY(current_schemas(false))
84                      ORDER BY ordinal_position",
85                    &[&table_name],
86                )
87                .await
88        }
89        .map_err(|e| FraiseQLError::Database {
90            message:   format!("Failed to query column information: {e}"),
91            sql_state: e.code().map(|c| c.code().to_string()),
92        })?;
93
94        let columns = rows
95            .into_iter()
96            .map(|row| {
97                let name: String = row.get(0);
98                let data_type: String = row.get(1);
99                let is_nullable: bool = row.get(2);
100                (name, data_type, is_nullable)
101            })
102            .collect();
103
104        Ok(columns)
105    }
106
107    async fn get_indexed_columns(&self, table_name: &str) -> Result<Vec<String>> {
108        let client = self.pool.get().await.map_err(|e| FraiseQLError::ConnectionPool {
109            message: format!("Failed to acquire connection: {e}"),
110        })?;
111
112        // Support schema-qualified names like "benchmark.tv_post".
113        // When no schema prefix is present, search across all schemas in the current
114        // search path so that non-public schemas work without explicit qualification.
115        let rows: Vec<Row> = if let Some(dot) = table_name.find('.') {
116            let (schema, name) = (&table_name[..dot], &table_name[dot + 1..]);
117            client
118                .query(
119                    r"SELECT DISTINCT a.attname AS column_name
120                      FROM pg_index i
121                      JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
122                      JOIN pg_class t ON t.oid = i.indrelid
123                      JOIN pg_namespace n ON n.oid = t.relnamespace
124                      WHERE t.relname = $1 AND n.nspname = $2 AND a.attnum > 0
125                      ORDER BY column_name",
126                    &[&name, &schema],
127                )
128                .await
129        } else {
130            client
131                .query(
132                    r"SELECT DISTINCT a.attname AS column_name
133                      FROM pg_index i
134                      JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
135                      JOIN pg_class t ON t.oid = i.indrelid
136                      JOIN pg_namespace n ON n.oid = t.relnamespace
137                      WHERE t.relname = $1
138                        AND n.nspname = ANY(current_schemas(false))
139                        AND a.attnum > 0
140                      ORDER BY column_name",
141                    &[&table_name],
142                )
143                .await
144        }
145        .map_err(|e| FraiseQLError::Database {
146            message:   format!("Failed to query index information: {e}"),
147            sql_state: e.code().map(|c| c.code().to_string()),
148        })?;
149
150        let indexed_columns = rows
151            .into_iter()
152            .map(|row| {
153                let name: String = row.get(0);
154                name
155            })
156            .collect();
157
158        Ok(indexed_columns)
159    }
160
161    fn database_type(&self) -> DatabaseType {
162        DatabaseType::PostgreSQL
163    }
164
165    async fn list_relations(&self) -> Result<Vec<RelationInfo>> {
166        let client = self.pool.get().await.map_err(|e| FraiseQLError::ConnectionPool {
167            message: format!("Failed to acquire connection: {e}"),
168        })?;
169
170        // List all tables and views visible in the current search path, excluding
171        // system schemas. Uses current_schemas(false) to match the same scope as
172        // get_columns / get_indexed_columns so that relation existence checks are
173        // consistent with column lookups.
174        let rows: Vec<Row> = client
175            .query(
176                r"SELECT table_schema, table_name,
177                         CASE table_type WHEN 'VIEW' THEN 'view' ELSE 'table' END AS kind
178                  FROM information_schema.tables
179                  WHERE table_schema = ANY(current_schemas(false))
180                  ORDER BY table_schema, table_name",
181                &[],
182            )
183            .await
184            .map_err(|e| FraiseQLError::Database {
185                message:   format!("Failed to list relations: {e}"),
186                sql_state: e.code().map(|c| c.code().to_string()),
187            })?;
188
189        let relations = rows
190            .into_iter()
191            .map(|row| {
192                let schema: String = row.get(0);
193                let name: String = row.get(1);
194                let kind_str: String = row.get(2);
195                let kind = if kind_str == "view" {
196                    RelationKind::View
197                } else {
198                    RelationKind::Table
199                };
200                RelationInfo { schema, name, kind }
201            })
202            .collect();
203
204        Ok(relations)
205    }
206
207    async fn get_sample_jsonb(
208        &self,
209        table_name: &str,
210        column_name: &str,
211    ) -> Result<Option<serde_json::Value>> {
212        let client = self.pool.get().await.map_err(|e| FraiseQLError::ConnectionPool {
213            message: format!("Failed to acquire connection: {e}"),
214        })?;
215
216        // Query for a sample row with non-null JSON data
217        // Use format! for identifiers (safe because we validate table_name pattern)
218        let query = format!(
219            r#"
220            SELECT "{column}"::text
221            FROM "{table}"
222            WHERE "{column}" IS NOT NULL
223            LIMIT 1
224            "#,
225            table = table_name,
226            column = column_name
227        );
228
229        let rows: Vec<Row> =
230            client.query(&query, &[]).await.map_err(|e| FraiseQLError::Database {
231                message:   format!("Failed to query sample JSONB: {e}"),
232                sql_state: e.code().map(|c| c.code().to_string()),
233            })?;
234
235        if rows.is_empty() {
236            return Ok(None);
237        }
238
239        let json_text: Option<String> = rows[0].get(0);
240        if let Some(text) = json_text {
241            let value: serde_json::Value =
242                serde_json::from_str(&text).map_err(|e| FraiseQLError::Parse {
243                    message:  format!("Failed to parse JSONB sample: {e}"),
244                    location: format!("{table_name}.{column_name}"),
245                })?;
246            Ok(Some(value))
247        } else {
248            Ok(None)
249        }
250    }
251}
252
253impl PostgresIntrospector {
254    /// Get indexed columns for a view/table that match the nested path naming convention.
255    ///
256    /// This method introspects the database to find columns that follow the FraiseQL
257    /// indexed column naming conventions:
258    /// - Human-readable: `items__product__category__code` (double underscore separated)
259    /// - Entity ID format: `f{entity_id}__{field_name}` (e.g., `f200100__code`)
260    ///
261    /// These columns are created by DBAs to optimize filtering on nested GraphQL paths
262    /// by avoiding JSONB extraction at runtime.
263    ///
264    /// # Arguments
265    ///
266    /// * `view_name` - Name of the view or table to introspect
267    ///
268    /// # Returns
269    ///
270    /// Set of column names that match the indexed column naming conventions.
271    ///
272    /// # Errors
273    ///
274    /// Returns `FraiseQLError::ConnectionPool` if a connection cannot be acquired,
275    /// or `FraiseQLError::Database` if the query fails.
276    ///
277    /// # Example
278    ///
279    /// ```no_run
280    /// use fraiseql_db::postgres::PostgresIntrospector;
281    /// # use fraiseql_error::Result;
282    /// use deadpool_postgres::Pool;
283    ///
284    /// # async fn example(pool: Pool) -> Result<()> {
285    /// let introspector = PostgresIntrospector::new(pool);
286    /// let indexed_cols = introspector.get_indexed_nested_columns("v_order_items").await?;
287    /// // Returns: {"items__product__category__code", "f200100__code", ...}
288    /// # Ok(())
289    /// # }
290    /// ```
291    pub async fn get_indexed_nested_columns(
292        &self,
293        view_name: &str,
294    ) -> Result<std::collections::HashSet<String>> {
295        let client = self.pool.get().await.map_err(|e| FraiseQLError::ConnectionPool {
296            message: format!("Failed to acquire connection: {e}"),
297        })?;
298
299        // Query information_schema for columns matching __ pattern.
300        // This works for both views and tables across all schemas in the search path.
301        let query = r"
302            SELECT column_name
303            FROM information_schema.columns
304            WHERE table_name = $1
305              AND table_schema = ANY(current_schemas(false))
306              AND column_name LIKE '%__%'
307            ORDER BY column_name
308        ";
309
310        let rows: Vec<Row> =
311            client.query(query, &[&view_name]).await.map_err(|e| FraiseQLError::Database {
312                message:   format!("Failed to query view columns: {e}"),
313                sql_state: e.code().map(|c| c.code().to_string()),
314            })?;
315
316        let indexed_columns: std::collections::HashSet<String> = rows
317            .into_iter()
318            .map(|row| {
319                let name: String = row.get(0);
320                name
321            })
322            .filter(|name| {
323                // Filter to only columns that match our naming conventions:
324                // 1. Human-readable: path__to__field (at least one __ separator)
325                // 2. Entity ID: f{digits}__field_name
326                Self::is_indexed_column_name(name)
327            })
328            .collect();
329
330        Ok(indexed_columns)
331    }
332
333    /// Check if a column name matches the indexed column naming convention.
334    ///
335    /// Valid patterns:
336    /// - `items__product__category__code` (human-readable nested path)
337    /// - `f200100__code` (entity ID format)
338    fn is_indexed_column_name(name: &str) -> bool {
339        // Must contain at least one double underscore
340        if !name.contains("__") {
341            return false;
342        }
343
344        // Check for entity ID format: f{digits}__field
345        if let Some(rest) = name.strip_prefix('f') {
346            if let Some(underscore_pos) = rest.find("__") {
347                let digits = &rest[..underscore_pos];
348                if digits.chars().all(|c| c.is_ascii_digit()) && !digits.is_empty() {
349                    // Verify the field part is valid
350                    let field_part = &rest[underscore_pos + 2..];
351                    if !field_part.is_empty()
352                        && field_part.chars().all(|c| c.is_ascii_alphanumeric() || c == '_')
353                        && !field_part.starts_with(|c: char| c.is_ascii_digit())
354                    {
355                        return true;
356                    }
357                }
358            }
359        }
360
361        // Human-readable format: split by __ and check each segment is valid identifier
362        // Must have at least 2 segments, and first segment must NOT be just 'f'
363        let segments: Vec<&str> = name.split("__").collect();
364        if segments.len() < 2 {
365            return false;
366        }
367
368        // Reject if first segment is just 'f' (reserved for entity ID format)
369        if segments[0] == "f" {
370            return false;
371        }
372
373        // Each segment should be a valid identifier (alphanumeric + underscore, not starting with
374        // digit)
375        segments.iter().all(|s| {
376            !s.is_empty()
377                && s.chars().all(|c| c.is_ascii_alphanumeric() || c == '_')
378                && !s.starts_with(|c: char| c.is_ascii_digit())
379        })
380    }
381
382    /// Get all column names for a view/table.
383    ///
384    /// # Arguments
385    ///
386    /// * `view_name` - Name of the view or table to introspect
387    ///
388    /// # Returns
389    ///
390    /// List of all column names in the view/table.
391    ///
392    /// # Errors
393    ///
394    /// Returns `FraiseQLError::ConnectionPool` if a connection cannot be acquired,
395    /// or `FraiseQLError::Database` if the query fails.
396    pub async fn get_view_columns(&self, view_name: &str) -> Result<Vec<String>> {
397        let client = self.pool.get().await.map_err(|e| FraiseQLError::ConnectionPool {
398            message: format!("Failed to acquire connection: {e}"),
399        })?;
400
401        let query = r"
402            SELECT column_name
403            FROM information_schema.columns
404            WHERE table_name = $1
405              AND table_schema = ANY(current_schemas(false))
406            ORDER BY ordinal_position
407        ";
408
409        let rows: Vec<Row> =
410            client.query(query, &[&view_name]).await.map_err(|e| FraiseQLError::Database {
411                message:   format!("Failed to query view columns: {e}"),
412                sql_state: e.code().map(|c| c.code().to_string()),
413            })?;
414
415        let columns = rows
416            .into_iter()
417            .map(|row| {
418                let name: String = row.get(0);
419                name
420            })
421            .collect();
422
423        Ok(columns)
424    }
425}
426
427/// Unit tests that don't require a PostgreSQL connection.
428#[cfg(test)]
429mod unit_tests {
430    use super::*;
431
432    #[test]
433    fn test_is_indexed_column_name_human_readable() {
434        // Valid human-readable patterns
435        assert!(PostgresIntrospector::is_indexed_column_name("items__product"));
436        assert!(PostgresIntrospector::is_indexed_column_name("items__product__category"));
437        assert!(PostgresIntrospector::is_indexed_column_name("items__product__category__code"));
438        assert!(PostgresIntrospector::is_indexed_column_name("order_items__product_name"));
439
440        // Invalid patterns
441        assert!(!PostgresIntrospector::is_indexed_column_name("items"));
442        assert!(!PostgresIntrospector::is_indexed_column_name("items_product")); // single underscore
443        assert!(!PostgresIntrospector::is_indexed_column_name("__items")); // empty first segment
444        assert!(!PostgresIntrospector::is_indexed_column_name("items__")); // empty last segment
445    }
446
447    #[test]
448    fn test_is_indexed_column_name_entity_id() {
449        // Valid entity ID patterns
450        assert!(PostgresIntrospector::is_indexed_column_name("f200100__code"));
451        assert!(PostgresIntrospector::is_indexed_column_name("f1__name"));
452        assert!(PostgresIntrospector::is_indexed_column_name("f123456789__field"));
453
454        // Invalid entity ID patterns (that also aren't valid human-readable)
455        assert!(!PostgresIntrospector::is_indexed_column_name("f__code")); // no digits after 'f', and 'f' alone is reserved
456
457        // Note: fx123__code IS valid as a human-readable pattern (fx123 is a valid identifier)
458        assert!(PostgresIntrospector::is_indexed_column_name("fx123__code")); // valid as human-readable
459    }
460}
461
462/// Integration tests that require a PostgreSQL connection.
463#[cfg(all(test, feature = "test-postgres"))]
464mod integration_tests {
465    use deadpool_postgres::{Config, ManagerConfig, RecyclingMethod, Runtime};
466    use tokio_postgres::NoTls;
467
468    use super::*;
469    use crate::postgres::PostgresAdapter;
470
471    const TEST_DB_URL: &str =
472        "postgresql://fraiseql_test:fraiseql_test_password@localhost:5433/test_fraiseql";
473
474    // Helper to create test introspector
475    async fn create_test_introspector() -> PostgresIntrospector {
476        let _adapter =
477            PostgresAdapter::new(TEST_DB_URL).await.expect("Failed to create test adapter");
478
479        // Extract pool from adapter (we need a way to get the pool)
480        // For now, create a new pool directly
481
482        let mut cfg = Config::new();
483        cfg.url = Some(TEST_DB_URL.to_string());
484        cfg.manager = Some(ManagerConfig {
485            recycling_method: RecyclingMethod::Fast,
486        });
487        cfg.pool = Some(deadpool_postgres::PoolConfig::new(10));
488
489        let pool = cfg.create_pool(Some(Runtime::Tokio1), NoTls).expect("Failed to create pool");
490
491        PostgresIntrospector::new(pool)
492    }
493
494    #[tokio::test]
495    async fn test_get_columns_tf_sales() {
496        let introspector = create_test_introspector().await;
497
498        let columns = introspector.get_columns("tf_sales").await.expect("Failed to get columns");
499
500        // Should have: id, revenue, quantity, cost, discount, data, customer_id, product_id,
501        // occurred_at, created_at
502        assert!(columns.len() >= 10, "Expected at least 10 columns, got {}", columns.len());
503
504        // Check for key columns
505        let column_names: Vec<String> = columns.iter().map(|(name, _, _)| name.clone()).collect();
506        assert!(column_names.contains(&"revenue".to_string()));
507        assert!(column_names.contains(&"quantity".to_string()));
508        assert!(column_names.contains(&"data".to_string()));
509        assert!(column_names.contains(&"customer_id".to_string()));
510    }
511
512    #[tokio::test]
513    async fn test_get_indexed_columns_tf_sales() {
514        let introspector = create_test_introspector().await;
515
516        let indexed = introspector
517            .get_indexed_columns("tf_sales")
518            .await
519            .expect("Failed to get indexed columns");
520
521        // Should have indexes on: id (PK), customer_id, product_id, occurred_at, data (GIN)
522        assert!(indexed.len() >= 4, "Expected at least 4 indexed columns, got {}", indexed.len());
523
524        assert!(indexed.contains(&"customer_id".to_string()));
525        assert!(indexed.contains(&"product_id".to_string()));
526        assert!(indexed.contains(&"occurred_at".to_string()));
527    }
528
529    #[tokio::test]
530    async fn test_database_type() {
531        let introspector = create_test_introspector().await;
532        assert_eq!(introspector.database_type(), DatabaseType::PostgreSQL);
533    }
534}