Skip to main content

fraiseql_db/postgres/
introspector.rs

1/// ! PostgreSQL database introspection for fact tables.
2use deadpool_postgres::Pool;
3use fraiseql_error::{FraiseQLError, Result};
4use tokio_postgres::Row;
5
6use crate::{DatabaseType, introspector::DatabaseIntrospector};
7
8/// PostgreSQL introspector for fact table metadata.
9pub struct PostgresIntrospector {
10    pool: Pool,
11}
12
13impl PostgresIntrospector {
14    /// Create new PostgreSQL introspector from connection pool.
15    #[must_use]
16    pub const fn new(pool: Pool) -> Self {
17        Self { pool }
18    }
19}
20
21impl DatabaseIntrospector for PostgresIntrospector {
22    async fn list_fact_tables(&self) -> Result<Vec<String>> {
23        let client = self.pool.get().await.map_err(|e| FraiseQLError::ConnectionPool {
24            message: format!("Failed to acquire connection: {e}"),
25        })?;
26
27        // Query information_schema for tables matching tf_* pattern
28        let query = r"
29            SELECT table_name
30            FROM information_schema.tables
31            WHERE table_schema = 'public'
32              AND table_type = 'BASE TABLE'
33              AND table_name LIKE 'tf_%'
34            ORDER BY table_name
35        ";
36
37        let rows: Vec<Row> =
38            client.query(query, &[]).await.map_err(|e| FraiseQLError::Database {
39                message:   format!("Failed to list fact tables: {e}"),
40                sql_state: e.code().map(|c| c.code().to_string()),
41            })?;
42
43        let tables = rows
44            .into_iter()
45            .map(|row| {
46                let name: String = row.get(0);
47                name
48            })
49            .collect();
50
51        Ok(tables)
52    }
53
54    async fn get_columns(&self, table_name: &str) -> Result<Vec<(String, String, bool)>> {
55        let client = self.pool.get().await.map_err(|e| FraiseQLError::ConnectionPool {
56            message: format!("Failed to acquire connection: {e}"),
57        })?;
58
59        // Query information_schema for column information
60        let query = r"
61            SELECT
62                column_name,
63                data_type,
64                is_nullable = 'YES' as is_nullable
65            FROM information_schema.columns
66            WHERE table_name = $1
67            AND table_schema = 'public'
68            ORDER BY ordinal_position
69        ";
70
71        let rows: Vec<Row> =
72            client.query(query, &[&table_name]).await.map_err(|e| FraiseQLError::Database {
73                message:   format!("Failed to query column information: {e}"),
74                sql_state: e.code().map(|c| c.code().to_string()),
75            })?;
76
77        let columns = rows
78            .into_iter()
79            .map(|row| {
80                let name: String = row.get(0);
81                let data_type: String = row.get(1);
82                let is_nullable: bool = row.get(2);
83                (name, data_type, is_nullable)
84            })
85            .collect();
86
87        Ok(columns)
88    }
89
90    async fn get_indexed_columns(&self, table_name: &str) -> Result<Vec<String>> {
91        let client = self.pool.get().await.map_err(|e| FraiseQLError::ConnectionPool {
92            message: format!("Failed to acquire connection: {e}"),
93        })?;
94
95        // Query pg_indexes for indexed columns
96        let query = r"
97            SELECT DISTINCT
98                a.attname as column_name
99            FROM
100                pg_index i
101                JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
102                JOIN pg_class t ON t.oid = i.indrelid
103                JOIN pg_namespace n ON n.oid = t.relnamespace
104            WHERE
105                t.relname = $1
106                AND n.nspname = 'public'
107                AND a.attnum > 0
108            ORDER BY column_name
109        ";
110
111        let rows: Vec<Row> =
112            client.query(query, &[&table_name]).await.map_err(|e| FraiseQLError::Database {
113                message:   format!("Failed to query index information: {e}"),
114                sql_state: e.code().map(|c| c.code().to_string()),
115            })?;
116
117        let indexed_columns = rows
118            .into_iter()
119            .map(|row| {
120                let name: String = row.get(0);
121                name
122            })
123            .collect();
124
125        Ok(indexed_columns)
126    }
127
128    fn database_type(&self) -> DatabaseType {
129        DatabaseType::PostgreSQL
130    }
131
132    async fn get_sample_jsonb(
133        &self,
134        table_name: &str,
135        column_name: &str,
136    ) -> Result<Option<serde_json::Value>> {
137        let client = self.pool.get().await.map_err(|e| FraiseQLError::ConnectionPool {
138            message: format!("Failed to acquire connection: {e}"),
139        })?;
140
141        // Query for a sample row with non-null JSON data
142        // Use format! for identifiers (safe because we validate table_name pattern)
143        let query = format!(
144            r#"
145            SELECT "{column}"::text
146            FROM "{table}"
147            WHERE "{column}" IS NOT NULL
148            LIMIT 1
149            "#,
150            table = table_name,
151            column = column_name
152        );
153
154        let rows: Vec<Row> =
155            client.query(&query, &[]).await.map_err(|e| FraiseQLError::Database {
156                message:   format!("Failed to query sample JSONB: {e}"),
157                sql_state: e.code().map(|c| c.code().to_string()),
158            })?;
159
160        if rows.is_empty() {
161            return Ok(None);
162        }
163
164        let json_text: Option<String> = rows[0].get(0);
165        if let Some(text) = json_text {
166            let value: serde_json::Value =
167                serde_json::from_str(&text).map_err(|e| FraiseQLError::Parse {
168                    message:  format!("Failed to parse JSONB sample: {e}"),
169                    location: format!("{table_name}.{column_name}"),
170                })?;
171            Ok(Some(value))
172        } else {
173            Ok(None)
174        }
175    }
176}
177
178impl PostgresIntrospector {
179    /// Get indexed columns for a view/table that match the nested path naming convention.
180    ///
181    /// This method introspects the database to find columns that follow the FraiseQL
182    /// indexed column naming conventions:
183    /// - Human-readable: `items__product__category__code` (double underscore separated)
184    /// - Entity ID format: `f{entity_id}__{field_name}` (e.g., `f200100__code`)
185    ///
186    /// These columns are created by DBAs to optimize filtering on nested GraphQL paths
187    /// by avoiding JSONB extraction at runtime.
188    ///
189    /// # Arguments
190    ///
191    /// * `view_name` - Name of the view or table to introspect
192    ///
193    /// # Returns
194    ///
195    /// Set of column names that match the indexed column naming conventions.
196    ///
197    /// # Errors
198    ///
199    /// Returns `FraiseQLError::ConnectionPool` if a connection cannot be acquired,
200    /// or `FraiseQLError::Database` if the query fails.
201    ///
202    /// # Example
203    ///
204    /// ```no_run
205    /// use fraiseql_db::postgres::PostgresIntrospector;
206    /// # use fraiseql_error::Result;
207    /// use deadpool_postgres::Pool;
208    ///
209    /// # async fn example(pool: Pool) -> Result<()> {
210    /// let introspector = PostgresIntrospector::new(pool);
211    /// let indexed_cols = introspector.get_indexed_nested_columns("v_order_items").await?;
212    /// // Returns: {"items__product__category__code", "f200100__code", ...}
213    /// # Ok(())
214    /// # }
215    /// ```
216    pub async fn get_indexed_nested_columns(
217        &self,
218        view_name: &str,
219    ) -> Result<std::collections::HashSet<String>> {
220        let client = self.pool.get().await.map_err(|e| FraiseQLError::ConnectionPool {
221            message: format!("Failed to acquire connection: {e}"),
222        })?;
223
224        // Query information_schema for columns matching __ pattern
225        // This works for both views and tables
226        let query = r"
227            SELECT column_name
228            FROM information_schema.columns
229            WHERE table_name = $1
230              AND table_schema = 'public'
231              AND column_name LIKE '%__%'
232            ORDER BY column_name
233        ";
234
235        let rows: Vec<Row> =
236            client.query(query, &[&view_name]).await.map_err(|e| FraiseQLError::Database {
237                message:   format!("Failed to query view columns: {e}"),
238                sql_state: e.code().map(|c| c.code().to_string()),
239            })?;
240
241        let indexed_columns: std::collections::HashSet<String> = rows
242            .into_iter()
243            .map(|row| {
244                let name: String = row.get(0);
245                name
246            })
247            .filter(|name| {
248                // Filter to only columns that match our naming conventions:
249                // 1. Human-readable: path__to__field (at least one __ separator)
250                // 2. Entity ID: f{digits}__field_name
251                Self::is_indexed_column_name(name)
252            })
253            .collect();
254
255        Ok(indexed_columns)
256    }
257
258    /// Check if a column name matches the indexed column naming convention.
259    ///
260    /// Valid patterns:
261    /// - `items__product__category__code` (human-readable nested path)
262    /// - `f200100__code` (entity ID format)
263    fn is_indexed_column_name(name: &str) -> bool {
264        // Must contain at least one double underscore
265        if !name.contains("__") {
266            return false;
267        }
268
269        // Check for entity ID format: f{digits}__field
270        if let Some(rest) = name.strip_prefix('f') {
271            if let Some(underscore_pos) = rest.find("__") {
272                let digits = &rest[..underscore_pos];
273                if digits.chars().all(|c| c.is_ascii_digit()) && !digits.is_empty() {
274                    // Verify the field part is valid
275                    let field_part = &rest[underscore_pos + 2..];
276                    if !field_part.is_empty()
277                        && field_part.chars().all(|c| c.is_ascii_alphanumeric() || c == '_')
278                        && !field_part.starts_with(|c: char| c.is_ascii_digit())
279                    {
280                        return true;
281                    }
282                }
283            }
284        }
285
286        // Human-readable format: split by __ and check each segment is valid identifier
287        // Must have at least 2 segments, and first segment must NOT be just 'f'
288        let segments: Vec<&str> = name.split("__").collect();
289        if segments.len() < 2 {
290            return false;
291        }
292
293        // Reject if first segment is just 'f' (reserved for entity ID format)
294        if segments[0] == "f" {
295            return false;
296        }
297
298        // Each segment should be a valid identifier (alphanumeric + underscore, not starting with
299        // digit)
300        segments.iter().all(|s| {
301            !s.is_empty()
302                && s.chars().all(|c| c.is_ascii_alphanumeric() || c == '_')
303                && !s.starts_with(|c: char| c.is_ascii_digit())
304        })
305    }
306
307    /// Get all column names for a view/table.
308    ///
309    /// # Arguments
310    ///
311    /// * `view_name` - Name of the view or table to introspect
312    ///
313    /// # Returns
314    ///
315    /// List of all column names in the view/table.
316    ///
317    /// # Errors
318    ///
319    /// Returns `FraiseQLError::ConnectionPool` if a connection cannot be acquired,
320    /// or `FraiseQLError::Database` if the query fails.
321    pub async fn get_view_columns(&self, view_name: &str) -> Result<Vec<String>> {
322        let client = self.pool.get().await.map_err(|e| FraiseQLError::ConnectionPool {
323            message: format!("Failed to acquire connection: {e}"),
324        })?;
325
326        let query = r"
327            SELECT column_name
328            FROM information_schema.columns
329            WHERE table_name = $1
330              AND table_schema = 'public'
331            ORDER BY ordinal_position
332        ";
333
334        let rows: Vec<Row> =
335            client.query(query, &[&view_name]).await.map_err(|e| FraiseQLError::Database {
336                message:   format!("Failed to query view columns: {e}"),
337                sql_state: e.code().map(|c| c.code().to_string()),
338            })?;
339
340        let columns = rows
341            .into_iter()
342            .map(|row| {
343                let name: String = row.get(0);
344                name
345            })
346            .collect();
347
348        Ok(columns)
349    }
350}
351
352/// Unit tests that don't require a PostgreSQL connection.
353#[cfg(test)]
354mod unit_tests {
355    use super::*;
356
357    #[test]
358    fn test_is_indexed_column_name_human_readable() {
359        // Valid human-readable patterns
360        assert!(PostgresIntrospector::is_indexed_column_name("items__product"));
361        assert!(PostgresIntrospector::is_indexed_column_name("items__product__category"));
362        assert!(PostgresIntrospector::is_indexed_column_name("items__product__category__code"));
363        assert!(PostgresIntrospector::is_indexed_column_name("order_items__product_name"));
364
365        // Invalid patterns
366        assert!(!PostgresIntrospector::is_indexed_column_name("items"));
367        assert!(!PostgresIntrospector::is_indexed_column_name("items_product")); // single underscore
368        assert!(!PostgresIntrospector::is_indexed_column_name("__items")); // empty first segment
369        assert!(!PostgresIntrospector::is_indexed_column_name("items__")); // empty last segment
370    }
371
372    #[test]
373    fn test_is_indexed_column_name_entity_id() {
374        // Valid entity ID patterns
375        assert!(PostgresIntrospector::is_indexed_column_name("f200100__code"));
376        assert!(PostgresIntrospector::is_indexed_column_name("f1__name"));
377        assert!(PostgresIntrospector::is_indexed_column_name("f123456789__field"));
378
379        // Invalid entity ID patterns (that also aren't valid human-readable)
380        assert!(!PostgresIntrospector::is_indexed_column_name("f__code")); // no digits after 'f', and 'f' alone is reserved
381
382        // Note: fx123__code IS valid as a human-readable pattern (fx123 is a valid identifier)
383        assert!(PostgresIntrospector::is_indexed_column_name("fx123__code")); // valid as human-readable
384    }
385}
386
387/// Integration tests that require a PostgreSQL connection.
388#[cfg(all(test, feature = "test-postgres"))]
389mod integration_tests {
390    use deadpool_postgres::{Config, ManagerConfig, RecyclingMethod, Runtime};
391    use tokio_postgres::NoTls;
392
393    use super::*;
394    use crate::postgres::PostgresAdapter;
395
396    const TEST_DB_URL: &str =
397        "postgresql://fraiseql_test:fraiseql_test_password@localhost:5433/test_fraiseql";
398
399    // Helper to create test introspector
400    async fn create_test_introspector() -> PostgresIntrospector {
401        let _adapter =
402            PostgresAdapter::new(TEST_DB_URL).await.expect("Failed to create test adapter");
403
404        // Extract pool from adapter (we need a way to get the pool)
405        // For now, create a new pool directly
406
407        let mut cfg = Config::new();
408        cfg.url = Some(TEST_DB_URL.to_string());
409        cfg.manager = Some(ManagerConfig {
410            recycling_method: RecyclingMethod::Fast,
411        });
412        cfg.pool = Some(deadpool_postgres::PoolConfig::new(10));
413
414        let pool = cfg.create_pool(Some(Runtime::Tokio1), NoTls).expect("Failed to create pool");
415
416        PostgresIntrospector::new(pool)
417    }
418
419    #[tokio::test]
420    async fn test_get_columns_tf_sales() {
421        let introspector = create_test_introspector().await;
422
423        let columns = introspector.get_columns("tf_sales").await.expect("Failed to get columns");
424
425        // Should have: id, revenue, quantity, cost, discount, data, customer_id, product_id,
426        // occurred_at, created_at
427        assert!(columns.len() >= 10, "Expected at least 10 columns, got {}", columns.len());
428
429        // Check for key columns
430        let column_names: Vec<String> = columns.iter().map(|(name, _, _)| name.clone()).collect();
431        assert!(column_names.contains(&"revenue".to_string()));
432        assert!(column_names.contains(&"quantity".to_string()));
433        assert!(column_names.contains(&"data".to_string()));
434        assert!(column_names.contains(&"customer_id".to_string()));
435    }
436
437    #[tokio::test]
438    async fn test_get_indexed_columns_tf_sales() {
439        let introspector = create_test_introspector().await;
440
441        let indexed = introspector
442            .get_indexed_columns("tf_sales")
443            .await
444            .expect("Failed to get indexed columns");
445
446        // Should have indexes on: id (PK), customer_id, product_id, occurred_at, data (GIN)
447        assert!(indexed.len() >= 4, "Expected at least 4 indexed columns, got {}", indexed.len());
448
449        assert!(indexed.contains(&"customer_id".to_string()));
450        assert!(indexed.contains(&"product_id".to_string()));
451        assert!(indexed.contains(&"occurred_at".to_string()));
452    }
453
454    #[tokio::test]
455    async fn test_database_type() {
456        let introspector = create_test_introspector().await;
457        assert_eq!(introspector.database_type(), DatabaseType::PostgreSQL);
458    }
459}