fraiseql_db/postgres/
introspector.rs1use deadpool_postgres::Pool;
3use fraiseql_error::{FraiseQLError, Result};
4use tokio_postgres::Row;
5
6use crate::{DatabaseType, introspector::DatabaseIntrospector};
7
8pub struct PostgresIntrospector {
10 pool: Pool,
11}
12
13impl PostgresIntrospector {
14 #[must_use]
16 pub const fn new(pool: Pool) -> Self {
17 Self { pool }
18 }
19}
20
21impl DatabaseIntrospector for PostgresIntrospector {
22 async fn list_fact_tables(&self) -> Result<Vec<String>> {
23 let client = self.pool.get().await.map_err(|e| FraiseQLError::ConnectionPool {
24 message: format!("Failed to acquire connection: {e}"),
25 })?;
26
27 let query = r"
29 SELECT table_name
30 FROM information_schema.tables
31 WHERE table_schema = 'public'
32 AND table_type = 'BASE TABLE'
33 AND table_name LIKE 'tf_%'
34 ORDER BY table_name
35 ";
36
37 let rows: Vec<Row> =
38 client.query(query, &[]).await.map_err(|e| FraiseQLError::Database {
39 message: format!("Failed to list fact tables: {e}"),
40 sql_state: e.code().map(|c| c.code().to_string()),
41 })?;
42
43 let tables = rows
44 .into_iter()
45 .map(|row| {
46 let name: String = row.get(0);
47 name
48 })
49 .collect();
50
51 Ok(tables)
52 }
53
54 async fn get_columns(&self, table_name: &str) -> Result<Vec<(String, String, bool)>> {
55 let client = self.pool.get().await.map_err(|e| FraiseQLError::ConnectionPool {
56 message: format!("Failed to acquire connection: {e}"),
57 })?;
58
59 let query = r"
61 SELECT
62 column_name,
63 data_type,
64 is_nullable = 'YES' as is_nullable
65 FROM information_schema.columns
66 WHERE table_name = $1
67 AND table_schema = 'public'
68 ORDER BY ordinal_position
69 ";
70
71 let rows: Vec<Row> =
72 client.query(query, &[&table_name]).await.map_err(|e| FraiseQLError::Database {
73 message: format!("Failed to query column information: {e}"),
74 sql_state: e.code().map(|c| c.code().to_string()),
75 })?;
76
77 let columns = rows
78 .into_iter()
79 .map(|row| {
80 let name: String = row.get(0);
81 let data_type: String = row.get(1);
82 let is_nullable: bool = row.get(2);
83 (name, data_type, is_nullable)
84 })
85 .collect();
86
87 Ok(columns)
88 }
89
90 async fn get_indexed_columns(&self, table_name: &str) -> Result<Vec<String>> {
91 let client = self.pool.get().await.map_err(|e| FraiseQLError::ConnectionPool {
92 message: format!("Failed to acquire connection: {e}"),
93 })?;
94
95 let query = r"
97 SELECT DISTINCT
98 a.attname as column_name
99 FROM
100 pg_index i
101 JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
102 JOIN pg_class t ON t.oid = i.indrelid
103 JOIN pg_namespace n ON n.oid = t.relnamespace
104 WHERE
105 t.relname = $1
106 AND n.nspname = 'public'
107 AND a.attnum > 0
108 ORDER BY column_name
109 ";
110
111 let rows: Vec<Row> =
112 client.query(query, &[&table_name]).await.map_err(|e| FraiseQLError::Database {
113 message: format!("Failed to query index information: {e}"),
114 sql_state: e.code().map(|c| c.code().to_string()),
115 })?;
116
117 let indexed_columns = rows
118 .into_iter()
119 .map(|row| {
120 let name: String = row.get(0);
121 name
122 })
123 .collect();
124
125 Ok(indexed_columns)
126 }
127
128 fn database_type(&self) -> DatabaseType {
129 DatabaseType::PostgreSQL
130 }
131
132 async fn get_sample_jsonb(
133 &self,
134 table_name: &str,
135 column_name: &str,
136 ) -> Result<Option<serde_json::Value>> {
137 let client = self.pool.get().await.map_err(|e| FraiseQLError::ConnectionPool {
138 message: format!("Failed to acquire connection: {e}"),
139 })?;
140
141 let query = format!(
144 r#"
145 SELECT "{column}"::text
146 FROM "{table}"
147 WHERE "{column}" IS NOT NULL
148 LIMIT 1
149 "#,
150 table = table_name,
151 column = column_name
152 );
153
154 let rows: Vec<Row> =
155 client.query(&query, &[]).await.map_err(|e| FraiseQLError::Database {
156 message: format!("Failed to query sample JSONB: {e}"),
157 sql_state: e.code().map(|c| c.code().to_string()),
158 })?;
159
160 if rows.is_empty() {
161 return Ok(None);
162 }
163
164 let json_text: Option<String> = rows[0].get(0);
165 if let Some(text) = json_text {
166 let value: serde_json::Value =
167 serde_json::from_str(&text).map_err(|e| FraiseQLError::Parse {
168 message: format!("Failed to parse JSONB sample: {e}"),
169 location: format!("{table_name}.{column_name}"),
170 })?;
171 Ok(Some(value))
172 } else {
173 Ok(None)
174 }
175 }
176}
177
178impl PostgresIntrospector {
179 pub async fn get_indexed_nested_columns(
217 &self,
218 view_name: &str,
219 ) -> Result<std::collections::HashSet<String>> {
220 let client = self.pool.get().await.map_err(|e| FraiseQLError::ConnectionPool {
221 message: format!("Failed to acquire connection: {e}"),
222 })?;
223
224 let query = r"
227 SELECT column_name
228 FROM information_schema.columns
229 WHERE table_name = $1
230 AND table_schema = 'public'
231 AND column_name LIKE '%__%'
232 ORDER BY column_name
233 ";
234
235 let rows: Vec<Row> =
236 client.query(query, &[&view_name]).await.map_err(|e| FraiseQLError::Database {
237 message: format!("Failed to query view columns: {e}"),
238 sql_state: e.code().map(|c| c.code().to_string()),
239 })?;
240
241 let indexed_columns: std::collections::HashSet<String> = rows
242 .into_iter()
243 .map(|row| {
244 let name: String = row.get(0);
245 name
246 })
247 .filter(|name| {
248 Self::is_indexed_column_name(name)
252 })
253 .collect();
254
255 Ok(indexed_columns)
256 }
257
258 fn is_indexed_column_name(name: &str) -> bool {
264 if !name.contains("__") {
266 return false;
267 }
268
269 if let Some(rest) = name.strip_prefix('f') {
271 if let Some(underscore_pos) = rest.find("__") {
272 let digits = &rest[..underscore_pos];
273 if digits.chars().all(|c| c.is_ascii_digit()) && !digits.is_empty() {
274 let field_part = &rest[underscore_pos + 2..];
276 if !field_part.is_empty()
277 && field_part.chars().all(|c| c.is_ascii_alphanumeric() || c == '_')
278 && !field_part.starts_with(|c: char| c.is_ascii_digit())
279 {
280 return true;
281 }
282 }
283 }
284 }
285
286 let segments: Vec<&str> = name.split("__").collect();
289 if segments.len() < 2 {
290 return false;
291 }
292
293 if segments[0] == "f" {
295 return false;
296 }
297
298 segments.iter().all(|s| {
301 !s.is_empty()
302 && s.chars().all(|c| c.is_ascii_alphanumeric() || c == '_')
303 && !s.starts_with(|c: char| c.is_ascii_digit())
304 })
305 }
306
307 pub async fn get_view_columns(&self, view_name: &str) -> Result<Vec<String>> {
322 let client = self.pool.get().await.map_err(|e| FraiseQLError::ConnectionPool {
323 message: format!("Failed to acquire connection: {e}"),
324 })?;
325
326 let query = r"
327 SELECT column_name
328 FROM information_schema.columns
329 WHERE table_name = $1
330 AND table_schema = 'public'
331 ORDER BY ordinal_position
332 ";
333
334 let rows: Vec<Row> =
335 client.query(query, &[&view_name]).await.map_err(|e| FraiseQLError::Database {
336 message: format!("Failed to query view columns: {e}"),
337 sql_state: e.code().map(|c| c.code().to_string()),
338 })?;
339
340 let columns = rows
341 .into_iter()
342 .map(|row| {
343 let name: String = row.get(0);
344 name
345 })
346 .collect();
347
348 Ok(columns)
349 }
350}
351
352#[cfg(test)]
354mod unit_tests {
355 use super::*;
356
357 #[test]
358 fn test_is_indexed_column_name_human_readable() {
359 assert!(PostgresIntrospector::is_indexed_column_name("items__product"));
361 assert!(PostgresIntrospector::is_indexed_column_name("items__product__category"));
362 assert!(PostgresIntrospector::is_indexed_column_name("items__product__category__code"));
363 assert!(PostgresIntrospector::is_indexed_column_name("order_items__product_name"));
364
365 assert!(!PostgresIntrospector::is_indexed_column_name("items"));
367 assert!(!PostgresIntrospector::is_indexed_column_name("items_product")); assert!(!PostgresIntrospector::is_indexed_column_name("__items")); assert!(!PostgresIntrospector::is_indexed_column_name("items__")); }
371
372 #[test]
373 fn test_is_indexed_column_name_entity_id() {
374 assert!(PostgresIntrospector::is_indexed_column_name("f200100__code"));
376 assert!(PostgresIntrospector::is_indexed_column_name("f1__name"));
377 assert!(PostgresIntrospector::is_indexed_column_name("f123456789__field"));
378
379 assert!(!PostgresIntrospector::is_indexed_column_name("f__code")); assert!(PostgresIntrospector::is_indexed_column_name("fx123__code")); }
385}
386
387#[cfg(all(test, feature = "test-postgres"))]
389mod integration_tests {
390 use deadpool_postgres::{Config, ManagerConfig, RecyclingMethod, Runtime};
391 use tokio_postgres::NoTls;
392
393 use super::*;
394 use crate::postgres::PostgresAdapter;
395
396 const TEST_DB_URL: &str =
397 "postgresql://fraiseql_test:fraiseql_test_password@localhost:5433/test_fraiseql";
398
399 async fn create_test_introspector() -> PostgresIntrospector {
401 let _adapter =
402 PostgresAdapter::new(TEST_DB_URL).await.expect("Failed to create test adapter");
403
404 let mut cfg = Config::new();
408 cfg.url = Some(TEST_DB_URL.to_string());
409 cfg.manager = Some(ManagerConfig {
410 recycling_method: RecyclingMethod::Fast,
411 });
412 cfg.pool = Some(deadpool_postgres::PoolConfig::new(10));
413
414 let pool = cfg.create_pool(Some(Runtime::Tokio1), NoTls).expect("Failed to create pool");
415
416 PostgresIntrospector::new(pool)
417 }
418
419 #[tokio::test]
420 async fn test_get_columns_tf_sales() {
421 let introspector = create_test_introspector().await;
422
423 let columns = introspector.get_columns("tf_sales").await.expect("Failed to get columns");
424
425 assert!(columns.len() >= 10, "Expected at least 10 columns, got {}", columns.len());
428
429 let column_names: Vec<String> = columns.iter().map(|(name, _, _)| name.clone()).collect();
431 assert!(column_names.contains(&"revenue".to_string()));
432 assert!(column_names.contains(&"quantity".to_string()));
433 assert!(column_names.contains(&"data".to_string()));
434 assert!(column_names.contains(&"customer_id".to_string()));
435 }
436
437 #[tokio::test]
438 async fn test_get_indexed_columns_tf_sales() {
439 let introspector = create_test_introspector().await;
440
441 let indexed = introspector
442 .get_indexed_columns("tf_sales")
443 .await
444 .expect("Failed to get indexed columns");
445
446 assert!(indexed.len() >= 4, "Expected at least 4 indexed columns, got {}", indexed.len());
448
449 assert!(indexed.contains(&"customer_id".to_string()));
450 assert!(indexed.contains(&"product_id".to_string()));
451 assert!(indexed.contains(&"occurred_at".to_string()));
452 }
453
454 #[tokio::test]
455 async fn test_database_type() {
456 let introspector = create_test_introspector().await;
457 assert_eq!(introspector.database_type(), DatabaseType::PostgreSQL);
458 }
459}