1use deadpool_postgres::Pool;
3use fraiseql_error::{FraiseQLError, Result};
4use tokio_postgres::Row;
5
6use crate::{
7 DatabaseType,
8 introspector::{DatabaseIntrospector, RelationInfo, RelationKind},
9};
10
11pub struct PostgresIntrospector {
13 pool: Pool,
14}
15
16impl PostgresIntrospector {
17 #[must_use]
19 pub const fn new(pool: Pool) -> Self {
20 Self { pool }
21 }
22}
23
24impl DatabaseIntrospector for PostgresIntrospector {
25 async fn list_fact_tables(&self) -> Result<Vec<String>> {
26 let client = self.pool.get().await.map_err(|e| FraiseQLError::ConnectionPool {
27 message: format!("Failed to acquire connection: {e}"),
28 })?;
29
30 let query = r"
33 SELECT table_name
34 FROM information_schema.tables
35 WHERE table_schema = ANY(current_schemas(false))
36 AND table_type = 'BASE TABLE'
37 AND table_name LIKE 'tf_%'
38 ORDER BY table_name
39 ";
40
41 let rows: Vec<Row> =
42 client.query(query, &[]).await.map_err(|e| FraiseQLError::Database {
43 message: format!("Failed to list fact tables: {e}"),
44 sql_state: e.code().map(|c| c.code().to_string()),
45 })?;
46
47 let tables = rows
48 .into_iter()
49 .map(|row| {
50 let name: String = row.get(0);
51 name
52 })
53 .collect();
54
55 Ok(tables)
56 }
57
58 async fn get_columns(&self, table_name: &str) -> Result<Vec<(String, String, bool)>> {
59 let client = self.pool.get().await.map_err(|e| FraiseQLError::ConnectionPool {
60 message: format!("Failed to acquire connection: {e}"),
61 })?;
62
63 let rows: Vec<Row> = if let Some(dot) = table_name.find('.') {
67 let (schema, name) = (&table_name[..dot], &table_name[dot + 1..]);
68 client
69 .query(
70 r"SELECT column_name, data_type, is_nullable = 'YES' as is_nullable
71 FROM information_schema.columns
72 WHERE table_name = $1 AND table_schema = $2
73 ORDER BY ordinal_position",
74 &[&name, &schema],
75 )
76 .await
77 } else {
78 client
79 .query(
80 r"SELECT column_name, data_type, is_nullable = 'YES' as is_nullable
81 FROM information_schema.columns
82 WHERE table_name = $1
83 AND table_schema = ANY(current_schemas(false))
84 ORDER BY ordinal_position",
85 &[&table_name],
86 )
87 .await
88 }
89 .map_err(|e| FraiseQLError::Database {
90 message: format!("Failed to query column information: {e}"),
91 sql_state: e.code().map(|c| c.code().to_string()),
92 })?;
93
94 let columns = rows
95 .into_iter()
96 .map(|row| {
97 let name: String = row.get(0);
98 let data_type: String = row.get(1);
99 let is_nullable: bool = row.get(2);
100 (name, data_type, is_nullable)
101 })
102 .collect();
103
104 Ok(columns)
105 }
106
107 async fn get_indexed_columns(&self, table_name: &str) -> Result<Vec<String>> {
108 let client = self.pool.get().await.map_err(|e| FraiseQLError::ConnectionPool {
109 message: format!("Failed to acquire connection: {e}"),
110 })?;
111
112 let rows: Vec<Row> = if let Some(dot) = table_name.find('.') {
116 let (schema, name) = (&table_name[..dot], &table_name[dot + 1..]);
117 client
118 .query(
119 r"SELECT DISTINCT a.attname AS column_name
120 FROM pg_index i
121 JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
122 JOIN pg_class t ON t.oid = i.indrelid
123 JOIN pg_namespace n ON n.oid = t.relnamespace
124 WHERE t.relname = $1 AND n.nspname = $2 AND a.attnum > 0
125 ORDER BY column_name",
126 &[&name, &schema],
127 )
128 .await
129 } else {
130 client
131 .query(
132 r"SELECT DISTINCT a.attname AS column_name
133 FROM pg_index i
134 JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
135 JOIN pg_class t ON t.oid = i.indrelid
136 JOIN pg_namespace n ON n.oid = t.relnamespace
137 WHERE t.relname = $1
138 AND n.nspname = ANY(current_schemas(false))
139 AND a.attnum > 0
140 ORDER BY column_name",
141 &[&table_name],
142 )
143 .await
144 }
145 .map_err(|e| FraiseQLError::Database {
146 message: format!("Failed to query index information: {e}"),
147 sql_state: e.code().map(|c| c.code().to_string()),
148 })?;
149
150 let indexed_columns = rows
151 .into_iter()
152 .map(|row| {
153 let name: String = row.get(0);
154 name
155 })
156 .collect();
157
158 Ok(indexed_columns)
159 }
160
161 fn database_type(&self) -> DatabaseType {
162 DatabaseType::PostgreSQL
163 }
164
165 async fn list_relations(&self) -> Result<Vec<RelationInfo>> {
166 let client = self.pool.get().await.map_err(|e| FraiseQLError::ConnectionPool {
167 message: format!("Failed to acquire connection: {e}"),
168 })?;
169
170 let rows: Vec<Row> = client
175 .query(
176 r"SELECT table_schema, table_name,
177 CASE table_type WHEN 'VIEW' THEN 'view' ELSE 'table' END AS kind
178 FROM information_schema.tables
179 WHERE table_schema = ANY(current_schemas(false))
180 ORDER BY table_schema, table_name",
181 &[],
182 )
183 .await
184 .map_err(|e| FraiseQLError::Database {
185 message: format!("Failed to list relations: {e}"),
186 sql_state: e.code().map(|c| c.code().to_string()),
187 })?;
188
189 let relations = rows
190 .into_iter()
191 .map(|row| {
192 let schema: String = row.get(0);
193 let name: String = row.get(1);
194 let kind_str: String = row.get(2);
195 let kind = if kind_str == "view" {
196 RelationKind::View
197 } else {
198 RelationKind::Table
199 };
200 RelationInfo { schema, name, kind }
201 })
202 .collect();
203
204 Ok(relations)
205 }
206
207 async fn get_sample_jsonb(
208 &self,
209 table_name: &str,
210 column_name: &str,
211 ) -> Result<Option<serde_json::Value>> {
212 let client = self.pool.get().await.map_err(|e| FraiseQLError::ConnectionPool {
213 message: format!("Failed to acquire connection: {e}"),
214 })?;
215
216 let query = format!(
219 r#"
220 SELECT "{column}"::text
221 FROM "{table}"
222 WHERE "{column}" IS NOT NULL
223 LIMIT 1
224 "#,
225 table = table_name,
226 column = column_name
227 );
228
229 let rows: Vec<Row> =
230 client.query(&query, &[]).await.map_err(|e| FraiseQLError::Database {
231 message: format!("Failed to query sample JSONB: {e}"),
232 sql_state: e.code().map(|c| c.code().to_string()),
233 })?;
234
235 if rows.is_empty() {
236 return Ok(None);
237 }
238
239 let json_text: Option<String> = rows[0].get(0);
240 if let Some(text) = json_text {
241 let value: serde_json::Value =
242 serde_json::from_str(&text).map_err(|e| FraiseQLError::Parse {
243 message: format!("Failed to parse JSONB sample: {e}"),
244 location: format!("{table_name}.{column_name}"),
245 })?;
246 Ok(Some(value))
247 } else {
248 Ok(None)
249 }
250 }
251}
252
253impl PostgresIntrospector {
254 pub async fn get_indexed_nested_columns(
292 &self,
293 view_name: &str,
294 ) -> Result<std::collections::HashSet<String>> {
295 let client = self.pool.get().await.map_err(|e| FraiseQLError::ConnectionPool {
296 message: format!("Failed to acquire connection: {e}"),
297 })?;
298
299 let query = r"
302 SELECT column_name
303 FROM information_schema.columns
304 WHERE table_name = $1
305 AND table_schema = ANY(current_schemas(false))
306 AND column_name LIKE '%__%'
307 ORDER BY column_name
308 ";
309
310 let rows: Vec<Row> =
311 client.query(query, &[&view_name]).await.map_err(|e| FraiseQLError::Database {
312 message: format!("Failed to query view columns: {e}"),
313 sql_state: e.code().map(|c| c.code().to_string()),
314 })?;
315
316 let indexed_columns: std::collections::HashSet<String> = rows
317 .into_iter()
318 .map(|row| {
319 let name: String = row.get(0);
320 name
321 })
322 .filter(|name| {
323 Self::is_indexed_column_name(name)
327 })
328 .collect();
329
330 Ok(indexed_columns)
331 }
332
333 fn is_indexed_column_name(name: &str) -> bool {
339 if !name.contains("__") {
341 return false;
342 }
343
344 if let Some(rest) = name.strip_prefix('f') {
346 if let Some(underscore_pos) = rest.find("__") {
347 let digits = &rest[..underscore_pos];
348 if digits.chars().all(|c| c.is_ascii_digit()) && !digits.is_empty() {
349 let field_part = &rest[underscore_pos + 2..];
351 if !field_part.is_empty()
352 && field_part.chars().all(|c| c.is_ascii_alphanumeric() || c == '_')
353 && !field_part.starts_with(|c: char| c.is_ascii_digit())
354 {
355 return true;
356 }
357 }
358 }
359 }
360
361 let segments: Vec<&str> = name.split("__").collect();
364 if segments.len() < 2 {
365 return false;
366 }
367
368 if segments[0] == "f" {
370 return false;
371 }
372
373 segments.iter().all(|s| {
376 !s.is_empty()
377 && s.chars().all(|c| c.is_ascii_alphanumeric() || c == '_')
378 && !s.starts_with(|c: char| c.is_ascii_digit())
379 })
380 }
381
382 pub async fn get_view_columns(&self, view_name: &str) -> Result<Vec<String>> {
397 let client = self.pool.get().await.map_err(|e| FraiseQLError::ConnectionPool {
398 message: format!("Failed to acquire connection: {e}"),
399 })?;
400
401 let query = r"
402 SELECT column_name
403 FROM information_schema.columns
404 WHERE table_name = $1
405 AND table_schema = ANY(current_schemas(false))
406 ORDER BY ordinal_position
407 ";
408
409 let rows: Vec<Row> =
410 client.query(query, &[&view_name]).await.map_err(|e| FraiseQLError::Database {
411 message: format!("Failed to query view columns: {e}"),
412 sql_state: e.code().map(|c| c.code().to_string()),
413 })?;
414
415 let columns = rows
416 .into_iter()
417 .map(|row| {
418 let name: String = row.get(0);
419 name
420 })
421 .collect();
422
423 Ok(columns)
424 }
425}
426
427#[cfg(test)]
429mod unit_tests {
430 use super::*;
431
432 #[test]
433 fn test_is_indexed_column_name_human_readable() {
434 assert!(PostgresIntrospector::is_indexed_column_name("items__product"));
436 assert!(PostgresIntrospector::is_indexed_column_name("items__product__category"));
437 assert!(PostgresIntrospector::is_indexed_column_name("items__product__category__code"));
438 assert!(PostgresIntrospector::is_indexed_column_name("order_items__product_name"));
439
440 assert!(!PostgresIntrospector::is_indexed_column_name("items"));
442 assert!(!PostgresIntrospector::is_indexed_column_name("items_product")); assert!(!PostgresIntrospector::is_indexed_column_name("__items")); assert!(!PostgresIntrospector::is_indexed_column_name("items__")); }
446
447 #[test]
448 fn test_is_indexed_column_name_entity_id() {
449 assert!(PostgresIntrospector::is_indexed_column_name("f200100__code"));
451 assert!(PostgresIntrospector::is_indexed_column_name("f1__name"));
452 assert!(PostgresIntrospector::is_indexed_column_name("f123456789__field"));
453
454 assert!(!PostgresIntrospector::is_indexed_column_name("f__code")); assert!(PostgresIntrospector::is_indexed_column_name("fx123__code")); }
460}
461
462#[cfg(all(test, feature = "test-postgres"))]
464mod integration_tests {
465 use deadpool_postgres::{Config, ManagerConfig, RecyclingMethod, Runtime};
466 use tokio_postgres::NoTls;
467
468 use super::*;
469 use crate::postgres::PostgresAdapter;
470
471 const TEST_DB_URL: &str =
472 "postgresql://fraiseql_test:fraiseql_test_password@localhost:5433/test_fraiseql";
473
474 async fn create_test_introspector() -> PostgresIntrospector {
476 let _adapter =
477 PostgresAdapter::new(TEST_DB_URL).await.expect("Failed to create test adapter");
478
479 let mut cfg = Config::new();
483 cfg.url = Some(TEST_DB_URL.to_string());
484 cfg.manager = Some(ManagerConfig {
485 recycling_method: RecyclingMethod::Fast,
486 });
487 cfg.pool = Some(deadpool_postgres::PoolConfig::new(10));
488
489 let pool = cfg.create_pool(Some(Runtime::Tokio1), NoTls).expect("Failed to create pool");
490
491 PostgresIntrospector::new(pool)
492 }
493
494 #[tokio::test]
495 async fn test_get_columns_tf_sales() {
496 let introspector = create_test_introspector().await;
497
498 let columns = introspector.get_columns("tf_sales").await.expect("Failed to get columns");
499
500 assert!(columns.len() >= 10, "Expected at least 10 columns, got {}", columns.len());
503
504 let column_names: Vec<String> = columns.iter().map(|(name, _, _)| name.clone()).collect();
506 assert!(column_names.contains(&"revenue".to_string()));
507 assert!(column_names.contains(&"quantity".to_string()));
508 assert!(column_names.contains(&"data".to_string()));
509 assert!(column_names.contains(&"customer_id".to_string()));
510 }
511
512 #[tokio::test]
513 async fn test_get_indexed_columns_tf_sales() {
514 let introspector = create_test_introspector().await;
515
516 let indexed = introspector
517 .get_indexed_columns("tf_sales")
518 .await
519 .expect("Failed to get indexed columns");
520
521 assert!(indexed.len() >= 4, "Expected at least 4 indexed columns, got {}", indexed.len());
523
524 assert!(indexed.contains(&"customer_id".to_string()));
525 assert!(indexed.contains(&"product_id".to_string()));
526 assert!(indexed.contains(&"occurred_at".to_string()));
527 }
528
529 #[tokio::test]
530 async fn test_database_type() {
531 let introspector = create_test_introspector().await;
532 assert_eq!(introspector.database_type(), DatabaseType::PostgreSQL);
533 }
534}