1use deadpool_postgres::Pool;
3use fraiseql_error::{FraiseQLError, Result};
4use tokio_postgres::Row;
5
6use crate::{DatabaseType, introspector::{DatabaseIntrospector, RelationInfo, RelationKind}};
7
8pub struct PostgresIntrospector {
10 pool: Pool,
11}
12
13impl PostgresIntrospector {
14 #[must_use]
16 pub const fn new(pool: Pool) -> Self {
17 Self { pool }
18 }
19}
20
21impl DatabaseIntrospector for PostgresIntrospector {
22 async fn list_fact_tables(&self) -> Result<Vec<String>> {
23 let client = self.pool.get().await.map_err(|e| FraiseQLError::ConnectionPool {
24 message: format!("Failed to acquire connection: {e}"),
25 })?;
26
27 let query = r"
30 SELECT table_name
31 FROM information_schema.tables
32 WHERE table_schema = ANY(current_schemas(false))
33 AND table_type = 'BASE TABLE'
34 AND table_name LIKE 'tf_%'
35 ORDER BY table_name
36 ";
37
38 let rows: Vec<Row> =
39 client.query(query, &[]).await.map_err(|e| FraiseQLError::Database {
40 message: format!("Failed to list fact tables: {e}"),
41 sql_state: e.code().map(|c| c.code().to_string()),
42 })?;
43
44 let tables = rows
45 .into_iter()
46 .map(|row| {
47 let name: String = row.get(0);
48 name
49 })
50 .collect();
51
52 Ok(tables)
53 }
54
55 async fn get_columns(&self, table_name: &str) -> Result<Vec<(String, String, bool)>> {
56 let client = self.pool.get().await.map_err(|e| FraiseQLError::ConnectionPool {
57 message: format!("Failed to acquire connection: {e}"),
58 })?;
59
60 let rows: Vec<Row> = if let Some(dot) = table_name.find('.') {
64 let (schema, name) = (&table_name[..dot], &table_name[dot + 1..]);
65 client
66 .query(
67 r"SELECT column_name, data_type, is_nullable = 'YES' as is_nullable
68 FROM information_schema.columns
69 WHERE table_name = $1 AND table_schema = $2
70 ORDER BY ordinal_position",
71 &[&name, &schema],
72 )
73 .await
74 } else {
75 client
76 .query(
77 r"SELECT column_name, data_type, is_nullable = 'YES' as is_nullable
78 FROM information_schema.columns
79 WHERE table_name = $1
80 AND table_schema = ANY(current_schemas(false))
81 ORDER BY ordinal_position",
82 &[&table_name],
83 )
84 .await
85 }
86 .map_err(|e| FraiseQLError::Database {
87 message: format!("Failed to query column information: {e}"),
88 sql_state: e.code().map(|c| c.code().to_string()),
89 })?;
90
91 let columns = rows
92 .into_iter()
93 .map(|row| {
94 let name: String = row.get(0);
95 let data_type: String = row.get(1);
96 let is_nullable: bool = row.get(2);
97 (name, data_type, is_nullable)
98 })
99 .collect();
100
101 Ok(columns)
102 }
103
104 async fn get_indexed_columns(&self, table_name: &str) -> Result<Vec<String>> {
105 let client = self.pool.get().await.map_err(|e| FraiseQLError::ConnectionPool {
106 message: format!("Failed to acquire connection: {e}"),
107 })?;
108
109 let rows: Vec<Row> = if let Some(dot) = table_name.find('.') {
113 let (schema, name) = (&table_name[..dot], &table_name[dot + 1..]);
114 client
115 .query(
116 r"SELECT DISTINCT a.attname AS column_name
117 FROM pg_index i
118 JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
119 JOIN pg_class t ON t.oid = i.indrelid
120 JOIN pg_namespace n ON n.oid = t.relnamespace
121 WHERE t.relname = $1 AND n.nspname = $2 AND a.attnum > 0
122 ORDER BY column_name",
123 &[&name, &schema],
124 )
125 .await
126 } else {
127 client
128 .query(
129 r"SELECT DISTINCT a.attname AS column_name
130 FROM pg_index i
131 JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
132 JOIN pg_class t ON t.oid = i.indrelid
133 JOIN pg_namespace n ON n.oid = t.relnamespace
134 WHERE t.relname = $1
135 AND n.nspname = ANY(current_schemas(false))
136 AND a.attnum > 0
137 ORDER BY column_name",
138 &[&table_name],
139 )
140 .await
141 }
142 .map_err(|e| FraiseQLError::Database {
143 message: format!("Failed to query index information: {e}"),
144 sql_state: e.code().map(|c| c.code().to_string()),
145 })?;
146
147 let indexed_columns = rows
148 .into_iter()
149 .map(|row| {
150 let name: String = row.get(0);
151 name
152 })
153 .collect();
154
155 Ok(indexed_columns)
156 }
157
158 fn database_type(&self) -> DatabaseType {
159 DatabaseType::PostgreSQL
160 }
161
162 async fn list_relations(&self) -> Result<Vec<RelationInfo>> {
163 let client = self.pool.get().await.map_err(|e| FraiseQLError::ConnectionPool {
164 message: format!("Failed to acquire connection: {e}"),
165 })?;
166
167 let rows: Vec<Row> = client
172 .query(
173 r"SELECT table_schema, table_name,
174 CASE table_type WHEN 'VIEW' THEN 'view' ELSE 'table' END AS kind
175 FROM information_schema.tables
176 WHERE table_schema = ANY(current_schemas(false))
177 ORDER BY table_schema, table_name",
178 &[],
179 )
180 .await
181 .map_err(|e| FraiseQLError::Database {
182 message: format!("Failed to list relations: {e}"),
183 sql_state: e.code().map(|c| c.code().to_string()),
184 })?;
185
186 let relations = rows
187 .into_iter()
188 .map(|row| {
189 let schema: String = row.get(0);
190 let name: String = row.get(1);
191 let kind_str: String = row.get(2);
192 let kind = if kind_str == "view" { RelationKind::View } else { RelationKind::Table };
193 RelationInfo { schema, name, kind }
194 })
195 .collect();
196
197 Ok(relations)
198 }
199
200 async fn get_sample_jsonb(
201 &self,
202 table_name: &str,
203 column_name: &str,
204 ) -> Result<Option<serde_json::Value>> {
205 let client = self.pool.get().await.map_err(|e| FraiseQLError::ConnectionPool {
206 message: format!("Failed to acquire connection: {e}"),
207 })?;
208
209 let query = format!(
212 r#"
213 SELECT "{column}"::text
214 FROM "{table}"
215 WHERE "{column}" IS NOT NULL
216 LIMIT 1
217 "#,
218 table = table_name,
219 column = column_name
220 );
221
222 let rows: Vec<Row> =
223 client.query(&query, &[]).await.map_err(|e| FraiseQLError::Database {
224 message: format!("Failed to query sample JSONB: {e}"),
225 sql_state: e.code().map(|c| c.code().to_string()),
226 })?;
227
228 if rows.is_empty() {
229 return Ok(None);
230 }
231
232 let json_text: Option<String> = rows[0].get(0);
233 if let Some(text) = json_text {
234 let value: serde_json::Value =
235 serde_json::from_str(&text).map_err(|e| FraiseQLError::Parse {
236 message: format!("Failed to parse JSONB sample: {e}"),
237 location: format!("{table_name}.{column_name}"),
238 })?;
239 Ok(Some(value))
240 } else {
241 Ok(None)
242 }
243 }
244}
245
246impl PostgresIntrospector {
247 pub async fn get_indexed_nested_columns(
285 &self,
286 view_name: &str,
287 ) -> Result<std::collections::HashSet<String>> {
288 let client = self.pool.get().await.map_err(|e| FraiseQLError::ConnectionPool {
289 message: format!("Failed to acquire connection: {e}"),
290 })?;
291
292 let query = r"
295 SELECT column_name
296 FROM information_schema.columns
297 WHERE table_name = $1
298 AND table_schema = ANY(current_schemas(false))
299 AND column_name LIKE '%__%'
300 ORDER BY column_name
301 ";
302
303 let rows: Vec<Row> =
304 client.query(query, &[&view_name]).await.map_err(|e| FraiseQLError::Database {
305 message: format!("Failed to query view columns: {e}"),
306 sql_state: e.code().map(|c| c.code().to_string()),
307 })?;
308
309 let indexed_columns: std::collections::HashSet<String> = rows
310 .into_iter()
311 .map(|row| {
312 let name: String = row.get(0);
313 name
314 })
315 .filter(|name| {
316 Self::is_indexed_column_name(name)
320 })
321 .collect();
322
323 Ok(indexed_columns)
324 }
325
326 fn is_indexed_column_name(name: &str) -> bool {
332 if !name.contains("__") {
334 return false;
335 }
336
337 if let Some(rest) = name.strip_prefix('f') {
339 if let Some(underscore_pos) = rest.find("__") {
340 let digits = &rest[..underscore_pos];
341 if digits.chars().all(|c| c.is_ascii_digit()) && !digits.is_empty() {
342 let field_part = &rest[underscore_pos + 2..];
344 if !field_part.is_empty()
345 && field_part.chars().all(|c| c.is_ascii_alphanumeric() || c == '_')
346 && !field_part.starts_with(|c: char| c.is_ascii_digit())
347 {
348 return true;
349 }
350 }
351 }
352 }
353
354 let segments: Vec<&str> = name.split("__").collect();
357 if segments.len() < 2 {
358 return false;
359 }
360
361 if segments[0] == "f" {
363 return false;
364 }
365
366 segments.iter().all(|s| {
369 !s.is_empty()
370 && s.chars().all(|c| c.is_ascii_alphanumeric() || c == '_')
371 && !s.starts_with(|c: char| c.is_ascii_digit())
372 })
373 }
374
375 pub async fn get_view_columns(&self, view_name: &str) -> Result<Vec<String>> {
390 let client = self.pool.get().await.map_err(|e| FraiseQLError::ConnectionPool {
391 message: format!("Failed to acquire connection: {e}"),
392 })?;
393
394 let query = r"
395 SELECT column_name
396 FROM information_schema.columns
397 WHERE table_name = $1
398 AND table_schema = ANY(current_schemas(false))
399 ORDER BY ordinal_position
400 ";
401
402 let rows: Vec<Row> =
403 client.query(query, &[&view_name]).await.map_err(|e| FraiseQLError::Database {
404 message: format!("Failed to query view columns: {e}"),
405 sql_state: e.code().map(|c| c.code().to_string()),
406 })?;
407
408 let columns = rows
409 .into_iter()
410 .map(|row| {
411 let name: String = row.get(0);
412 name
413 })
414 .collect();
415
416 Ok(columns)
417 }
418}
419
420#[cfg(test)]
422mod unit_tests {
423 use super::*;
424
425 #[test]
426 fn test_is_indexed_column_name_human_readable() {
427 assert!(PostgresIntrospector::is_indexed_column_name("items__product"));
429 assert!(PostgresIntrospector::is_indexed_column_name("items__product__category"));
430 assert!(PostgresIntrospector::is_indexed_column_name("items__product__category__code"));
431 assert!(PostgresIntrospector::is_indexed_column_name("order_items__product_name"));
432
433 assert!(!PostgresIntrospector::is_indexed_column_name("items"));
435 assert!(!PostgresIntrospector::is_indexed_column_name("items_product")); assert!(!PostgresIntrospector::is_indexed_column_name("__items")); assert!(!PostgresIntrospector::is_indexed_column_name("items__")); }
439
440 #[test]
441 fn test_is_indexed_column_name_entity_id() {
442 assert!(PostgresIntrospector::is_indexed_column_name("f200100__code"));
444 assert!(PostgresIntrospector::is_indexed_column_name("f1__name"));
445 assert!(PostgresIntrospector::is_indexed_column_name("f123456789__field"));
446
447 assert!(!PostgresIntrospector::is_indexed_column_name("f__code")); assert!(PostgresIntrospector::is_indexed_column_name("fx123__code")); }
453}
454
455#[cfg(all(test, feature = "test-postgres"))]
457mod integration_tests {
458 use deadpool_postgres::{Config, ManagerConfig, RecyclingMethod, Runtime};
459 use tokio_postgres::NoTls;
460
461 use super::*;
462 use crate::postgres::PostgresAdapter;
463
464 const TEST_DB_URL: &str =
465 "postgresql://fraiseql_test:fraiseql_test_password@localhost:5433/test_fraiseql";
466
467 async fn create_test_introspector() -> PostgresIntrospector {
469 let _adapter =
470 PostgresAdapter::new(TEST_DB_URL).await.expect("Failed to create test adapter");
471
472 let mut cfg = Config::new();
476 cfg.url = Some(TEST_DB_URL.to_string());
477 cfg.manager = Some(ManagerConfig {
478 recycling_method: RecyclingMethod::Fast,
479 });
480 cfg.pool = Some(deadpool_postgres::PoolConfig::new(10));
481
482 let pool = cfg.create_pool(Some(Runtime::Tokio1), NoTls).expect("Failed to create pool");
483
484 PostgresIntrospector::new(pool)
485 }
486
487 #[tokio::test]
488 async fn test_get_columns_tf_sales() {
489 let introspector = create_test_introspector().await;
490
491 let columns = introspector.get_columns("tf_sales").await.expect("Failed to get columns");
492
493 assert!(columns.len() >= 10, "Expected at least 10 columns, got {}", columns.len());
496
497 let column_names: Vec<String> = columns.iter().map(|(name, _, _)| name.clone()).collect();
499 assert!(column_names.contains(&"revenue".to_string()));
500 assert!(column_names.contains(&"quantity".to_string()));
501 assert!(column_names.contains(&"data".to_string()));
502 assert!(column_names.contains(&"customer_id".to_string()));
503 }
504
505 #[tokio::test]
506 async fn test_get_indexed_columns_tf_sales() {
507 let introspector = create_test_introspector().await;
508
509 let indexed = introspector
510 .get_indexed_columns("tf_sales")
511 .await
512 .expect("Failed to get indexed columns");
513
514 assert!(indexed.len() >= 4, "Expected at least 4 indexed columns, got {}", indexed.len());
516
517 assert!(indexed.contains(&"customer_id".to_string()));
518 assert!(indexed.contains(&"product_id".to_string()));
519 assert!(indexed.contains(&"occurred_at".to_string()));
520 }
521
522 #[tokio::test]
523 async fn test_database_type() {
524 let introspector = create_test_introspector().await;
525 assert_eq!(introspector.database_type(), DatabaseType::PostgreSQL);
526 }
527}