fraiseql_db/postgres/
introspector.rs1use deadpool_postgres::Pool;
3use fraiseql_error::{FraiseQLError, Result};
4use tokio_postgres::Row;
5
6use crate::{
7 DatabaseType,
8 introspector::{DatabaseIntrospector, RelationInfo, RelationKind},
9};
10
11pub struct PostgresIntrospector {
13 pool: Pool,
14}
15
16impl PostgresIntrospector {
17 #[must_use]
19 pub const fn new(pool: Pool) -> Self {
20 Self { pool }
21 }
22}
23
24impl DatabaseIntrospector for PostgresIntrospector {
25 async fn list_fact_tables(&self) -> Result<Vec<String>> {
26 let client = self.pool.get().await.map_err(|e| FraiseQLError::ConnectionPool {
27 message: format!("Failed to acquire connection: {e}"),
28 })?;
29
30 let query = r"
33 SELECT table_name
34 FROM information_schema.tables
35 WHERE table_schema = ANY(current_schemas(false))
36 AND table_type = 'BASE TABLE'
37 AND table_name LIKE 'tf_%'
38 ORDER BY table_name
39 ";
40
41 let rows: Vec<Row> =
42 client.query(query, &[]).await.map_err(|e| FraiseQLError::Database {
43 message: format!("Failed to list fact tables: {e}"),
44 sql_state: e.code().map(|c| c.code().to_string()),
45 })?;
46
47 let tables = rows
48 .into_iter()
49 .map(|row| {
50 let name: String = row.get(0);
51 name
52 })
53 .collect();
54
55 Ok(tables)
56 }
57
58 async fn get_columns(&self, table_name: &str) -> Result<Vec<(String, String, bool)>> {
59 let client = self.pool.get().await.map_err(|e| FraiseQLError::ConnectionPool {
60 message: format!("Failed to acquire connection: {e}"),
61 })?;
62
63 let rows: Vec<Row> = if let Some(dot) = table_name.find('.') {
67 let (schema, name) = (&table_name[..dot], &table_name[dot + 1..]);
68 client
69 .query(
70 r"SELECT column_name, data_type, is_nullable = 'YES' as is_nullable
71 FROM information_schema.columns
72 WHERE table_name = $1 AND table_schema = $2
73 ORDER BY ordinal_position",
74 &[&name, &schema],
75 )
76 .await
77 } else {
78 client
79 .query(
80 r"SELECT column_name, data_type, is_nullable = 'YES' as is_nullable
81 FROM information_schema.columns
82 WHERE table_name = $1
83 AND table_schema = ANY(current_schemas(false))
84 ORDER BY ordinal_position",
85 &[&table_name],
86 )
87 .await
88 }
89 .map_err(|e| FraiseQLError::Database {
90 message: format!("Failed to query column information: {e}"),
91 sql_state: e.code().map(|c| c.code().to_string()),
92 })?;
93
94 let columns = rows
95 .into_iter()
96 .map(|row| {
97 let name: String = row.get(0);
98 let data_type: String = row.get(1);
99 let is_nullable: bool = row.get(2);
100 (name, data_type, is_nullable)
101 })
102 .collect();
103
104 Ok(columns)
105 }
106
107 async fn get_indexed_columns(&self, table_name: &str) -> Result<Vec<String>> {
108 let client = self.pool.get().await.map_err(|e| FraiseQLError::ConnectionPool {
109 message: format!("Failed to acquire connection: {e}"),
110 })?;
111
112 let rows: Vec<Row> = if let Some(dot) = table_name.find('.') {
116 let (schema, name) = (&table_name[..dot], &table_name[dot + 1..]);
117 client
118 .query(
119 r"SELECT DISTINCT a.attname AS column_name
120 FROM pg_index i
121 JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
122 JOIN pg_class t ON t.oid = i.indrelid
123 JOIN pg_namespace n ON n.oid = t.relnamespace
124 WHERE t.relname = $1 AND n.nspname = $2 AND a.attnum > 0
125 ORDER BY column_name",
126 &[&name, &schema],
127 )
128 .await
129 } else {
130 client
131 .query(
132 r"SELECT DISTINCT a.attname AS column_name
133 FROM pg_index i
134 JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
135 JOIN pg_class t ON t.oid = i.indrelid
136 JOIN pg_namespace n ON n.oid = t.relnamespace
137 WHERE t.relname = $1
138 AND n.nspname = ANY(current_schemas(false))
139 AND a.attnum > 0
140 ORDER BY column_name",
141 &[&table_name],
142 )
143 .await
144 }
145 .map_err(|e| FraiseQLError::Database {
146 message: format!("Failed to query index information: {e}"),
147 sql_state: e.code().map(|c| c.code().to_string()),
148 })?;
149
150 let indexed_columns = rows
151 .into_iter()
152 .map(|row| {
153 let name: String = row.get(0);
154 name
155 })
156 .collect();
157
158 Ok(indexed_columns)
159 }
160
161 fn database_type(&self) -> DatabaseType {
162 DatabaseType::PostgreSQL
163 }
164
165 async fn list_relations(&self) -> Result<Vec<RelationInfo>> {
166 let client = self.pool.get().await.map_err(|e| FraiseQLError::ConnectionPool {
167 message: format!("Failed to acquire connection: {e}"),
168 })?;
169
170 let rows: Vec<Row> = client
175 .query(
176 r"SELECT table_schema, table_name,
177 CASE table_type WHEN 'VIEW' THEN 'view' ELSE 'table' END AS kind
178 FROM information_schema.tables
179 WHERE table_schema = ANY(current_schemas(false))
180 ORDER BY table_schema, table_name",
181 &[],
182 )
183 .await
184 .map_err(|e| FraiseQLError::Database {
185 message: format!("Failed to list relations: {e}"),
186 sql_state: e.code().map(|c| c.code().to_string()),
187 })?;
188
189 let relations = rows
190 .into_iter()
191 .map(|row| {
192 let schema: String = row.get(0);
193 let name: String = row.get(1);
194 let kind_str: String = row.get(2);
195 let kind = if kind_str == "view" {
196 RelationKind::View
197 } else {
198 RelationKind::Table
199 };
200 RelationInfo { schema, name, kind }
201 })
202 .collect();
203
204 Ok(relations)
205 }
206
207 async fn get_sample_jsonb(
208 &self,
209 table_name: &str,
210 column_name: &str,
211 ) -> Result<Option<serde_json::Value>> {
212 let client = self.pool.get().await.map_err(|e| FraiseQLError::ConnectionPool {
213 message: format!("Failed to acquire connection: {e}"),
214 })?;
215
216 let query = format!(
219 r#"
220 SELECT "{column}"::text
221 FROM "{table}"
222 WHERE "{column}" IS NOT NULL
223 LIMIT 1
224 "#,
225 table = table_name,
226 column = column_name
227 );
228
229 let rows: Vec<Row> =
230 client.query(&query, &[]).await.map_err(|e| FraiseQLError::Database {
231 message: format!("Failed to query sample JSONB: {e}"),
232 sql_state: e.code().map(|c| c.code().to_string()),
233 })?;
234
235 if rows.is_empty() {
236 return Ok(None);
237 }
238
239 let json_text: Option<String> = rows[0].get(0);
240 if let Some(text) = json_text {
241 let value: serde_json::Value =
242 serde_json::from_str(&text).map_err(|e| FraiseQLError::Parse {
243 message: format!("Failed to parse JSONB sample: {e}"),
244 location: format!("{table_name}.{column_name}"),
245 })?;
246 Ok(Some(value))
247 } else {
248 Ok(None)
249 }
250 }
251}
252
253impl PostgresIntrospector {
254 pub async fn get_indexed_nested_columns(
292 &self,
293 view_name: &str,
294 ) -> Result<std::collections::HashSet<String>> {
295 let client = self.pool.get().await.map_err(|e| FraiseQLError::ConnectionPool {
296 message: format!("Failed to acquire connection: {e}"),
297 })?;
298
299 let query = r"
302 SELECT column_name
303 FROM information_schema.columns
304 WHERE table_name = $1
305 AND table_schema = ANY(current_schemas(false))
306 AND column_name LIKE '%__%'
307 ORDER BY column_name
308 ";
309
310 let rows: Vec<Row> =
311 client.query(query, &[&view_name]).await.map_err(|e| FraiseQLError::Database {
312 message: format!("Failed to query view columns: {e}"),
313 sql_state: e.code().map(|c| c.code().to_string()),
314 })?;
315
316 let indexed_columns: std::collections::HashSet<String> = rows
317 .into_iter()
318 .map(|row| {
319 let name: String = row.get(0);
320 name
321 })
322 .filter(|name| {
323 Self::is_indexed_column_name(name)
327 })
328 .collect();
329
330 Ok(indexed_columns)
331 }
332
333 fn is_indexed_column_name(name: &str) -> bool {
339 if !name.contains("__") {
341 return false;
342 }
343
344 if let Some(rest) = name.strip_prefix('f') {
346 if let Some(underscore_pos) = rest.find("__") {
347 let digits = &rest[..underscore_pos];
348 if digits.chars().all(|c| c.is_ascii_digit()) && !digits.is_empty() {
349 let field_part = &rest[underscore_pos + 2..];
351 if !field_part.is_empty()
352 && field_part.chars().all(|c| c.is_ascii_alphanumeric() || c == '_')
353 && !field_part.starts_with(|c: char| c.is_ascii_digit())
354 {
355 return true;
356 }
357 }
358 }
359 }
360
361 let segments: Vec<&str> = name.split("__").collect();
364 if segments.len() < 2 {
365 return false;
366 }
367
368 if segments[0] == "f" {
370 return false;
371 }
372
373 segments.iter().all(|s| {
376 !s.is_empty()
377 && s.chars().all(|c| c.is_ascii_alphanumeric() || c == '_')
378 && !s.starts_with(|c: char| c.is_ascii_digit())
379 })
380 }
381
382 pub async fn get_view_columns(&self, view_name: &str) -> Result<Vec<String>> {
397 let client = self.pool.get().await.map_err(|e| FraiseQLError::ConnectionPool {
398 message: format!("Failed to acquire connection: {e}"),
399 })?;
400
401 let query = r"
402 SELECT column_name
403 FROM information_schema.columns
404 WHERE table_name = $1
405 AND table_schema = ANY(current_schemas(false))
406 ORDER BY ordinal_position
407 ";
408
409 let rows: Vec<Row> =
410 client.query(query, &[&view_name]).await.map_err(|e| FraiseQLError::Database {
411 message: format!("Failed to query view columns: {e}"),
412 sql_state: e.code().map(|c| c.code().to_string()),
413 })?;
414
415 let columns = rows
416 .into_iter()
417 .map(|row| {
418 let name: String = row.get(0);
419 name
420 })
421 .collect();
422
423 Ok(columns)
424 }
425}
426
427#[cfg(test)]
428mod tests;