1use async_trait::async_trait;
4use bytes::BufMut as _;
5use fraiseql_error::{FraiseQLError, Result};
6use tokio_postgres::Row;
7
8use super::{PostgresAdapter, build_where_select_sql, build_where_select_sql_ordered};
9use crate::{
10 identifier::quote_postgres_identifier,
11 traits::{DatabaseAdapter, SupportsMutations},
12 types::{
13 DatabaseType, JsonbValue, PoolMetrics, QueryParam,
14 sql_hints::{OrderByClause, SqlProjectionHint},
15 },
16 where_clause::WhereClause,
17};
18
19const PG_UNDEFINED_COLUMN: &str = "42703";
21
22#[derive(Debug)]
37enum FlexParam {
38 Null,
40 Text(String),
42}
43
44impl tokio_postgres::types::ToSql for FlexParam {
45 fn to_sql(
46 &self,
47 ty: &tokio_postgres::types::Type,
48 out: &mut bytes::BytesMut,
49 ) -> std::result::Result<tokio_postgres::types::IsNull, Box<dyn std::error::Error + Sync + Send>>
50 {
51 use tokio_postgres::types::{IsNull, Type};
52 match self {
53 Self::Null => Ok(IsNull::Yes),
54 Self::Text(s) => {
55 if *ty == Type::JSONB {
56 out.put_u8(1);
58 out.extend_from_slice(s.as_bytes());
59 } else if *ty == Type::JSON {
60 out.extend_from_slice(s.as_bytes());
61 } else if *ty == Type::UUID {
62 let uuid = uuid::Uuid::parse_str(s)?;
63 out.extend_from_slice(uuid.as_bytes());
64 } else if *ty == Type::INT4 {
65 let n: i32 = s.parse()?;
66 out.put_i32(n);
67 } else if *ty == Type::INT8 {
68 let n: i64 = s.parse()?;
69 out.put_i64(n);
70 } else if *ty == Type::BOOL {
71 let b: bool = s.parse()?;
72 out.put_u8(u8::from(b));
73 } else {
74 out.extend_from_slice(s.as_bytes());
77 }
78 Ok(IsNull::No)
79 },
80 }
81 }
82
83 fn accepts(_ty: &tokio_postgres::types::Type) -> bool {
84 true
86 }
87
88 fn to_sql_checked(
89 &self,
90 ty: &tokio_postgres::types::Type,
91 out: &mut bytes::BytesMut,
92 ) -> std::result::Result<tokio_postgres::types::IsNull, Box<dyn std::error::Error + Sync + Send>>
93 {
94 self.to_sql(ty, out)
97 }
98}
99
100fn enrich_undefined_column_error(
108 err: FraiseQLError,
109 view: &str,
110 where_clause: Option<&WhereClause>,
111) -> FraiseQLError {
112 let FraiseQLError::Database { ref sql_state, .. } = err else {
113 return err;
114 };
115 if sql_state.as_deref() != Some(PG_UNDEFINED_COLUMN) {
116 return err;
117 }
118 let native_cols: Vec<&str> =
119 where_clause.map(|wc| wc.native_column_names()).unwrap_or_default();
120 if native_cols.is_empty() {
121 return err;
122 }
123 FraiseQLError::Database {
124 message: format!(
125 "Column(s) {:?} referenced as native column(s) on `{view}` do not exist. \
126 These columns were auto-inferred from ID/UUID-typed query arguments. \
127 Either add the column(s) to the table/view, or set \
128 `native_columns = {{}}` explicitly in your schema to disable inference.",
129 native_cols,
130 ),
131 sql_state: Some(PG_UNDEFINED_COLUMN.to_string()),
132 }
133}
134
135fn row_to_map(row: &Row) -> std::collections::HashMap<String, serde_json::Value> {
140 let mut map = std::collections::HashMap::new();
141 for (idx, column) in row.columns().iter().enumerate() {
142 let column_name = column.name().to_string();
143 let value: serde_json::Value = if let Ok(v) = row.try_get::<_, i32>(idx) {
144 serde_json::json!(v)
145 } else if let Ok(v) = row.try_get::<_, i64>(idx) {
146 serde_json::json!(v)
147 } else if let Ok(v) = row.try_get::<_, f64>(idx) {
148 serde_json::json!(v)
149 } else if let Ok(v) = row.try_get::<_, String>(idx) {
150 serde_json::json!(v)
151 } else if let Ok(v) = row.try_get::<_, bool>(idx) {
152 serde_json::json!(v)
153 } else if let Ok(v) = row.try_get::<_, serde_json::Value>(idx) {
154 v
155 } else {
156 serde_json::Value::Null
157 };
158 map.insert(column_name, value);
159 }
160 map
161}
162
163#[async_trait]
167impl DatabaseAdapter for PostgresAdapter {
168 async fn execute_with_projection(
169 &self,
170 view: &str,
171 projection: Option<&SqlProjectionHint>,
172 where_clause: Option<&WhereClause>,
173 limit: Option<u32>,
174 offset: Option<u32>,
175 order_by: Option<&[OrderByClause]>,
176 ) -> Result<Vec<JsonbValue>> {
177 self.execute_with_projection_impl(view, projection, where_clause, limit, offset, order_by)
178 .await
179 }
180
181 async fn execute_where_query(
182 &self,
183 view: &str,
184 where_clause: Option<&WhereClause>,
185 limit: Option<u32>,
186 offset: Option<u32>,
187 order_by: Option<&[OrderByClause]>,
188 ) -> Result<Vec<JsonbValue>> {
189 let (sql, typed_params) =
190 build_where_select_sql_ordered(view, where_clause, limit, offset, order_by)?;
191
192 let param_refs = crate::types::as_sql_param_refs(&typed_params);
193
194 self.execute_raw(&sql, ¶m_refs)
195 .await
196 .map_err(|e| enrich_undefined_column_error(e, view, where_clause))
197 }
198
199 async fn explain_where_query(
200 &self,
201 view: &str,
202 where_clause: Option<&WhereClause>,
203 limit: Option<u32>,
204 offset: Option<u32>,
205 ) -> Result<serde_json::Value> {
206 let (select_sql, typed_params) = build_where_select_sql(view, where_clause, limit, offset)?;
207 if select_sql.contains(';') {
210 return Err(FraiseQLError::Validation {
211 message: "EXPLAIN SQL must be a single statement".into(),
212 path: None,
213 });
214 }
215 let explain_sql = format!("EXPLAIN (ANALYZE, BUFFERS, FORMAT JSON) {select_sql}");
216
217 let param_refs = crate::types::as_sql_param_refs(&typed_params);
218
219 let client = self.acquire_connection_with_retry().await?;
220 let rows = client.query(explain_sql.as_str(), ¶m_refs).await.map_err(|e| {
221 FraiseQLError::Database {
222 message: format!("EXPLAIN ANALYZE failed: {e}"),
223 sql_state: e.code().map(|c| c.code().to_string()),
224 }
225 })?;
226
227 if let Some(row) = rows.first() {
228 let plan: serde_json::Value = row.try_get(0).map_err(|e| FraiseQLError::Database {
229 message: format!("Failed to parse EXPLAIN output: {e}"),
230 sql_state: None,
231 })?;
232 Ok(plan)
233 } else {
234 Ok(serde_json::Value::Null)
235 }
236 }
237
238 fn database_type(&self) -> DatabaseType {
239 DatabaseType::PostgreSQL
240 }
241
242 async fn health_check(&self) -> Result<()> {
243 let client = self.acquire_connection_with_retry().await?;
245
246 client.query("SELECT 1", &[]).await.map_err(|e| FraiseQLError::Database {
247 message: format!("Health check failed: {e}"),
248 sql_state: e.code().map(|c| c.code().to_string()),
249 })?;
250
251 Ok(())
252 }
253
254 #[allow(clippy::cast_possible_truncation)] fn pool_metrics(&self) -> PoolMetrics {
256 let status = self.pool.status();
257
258 PoolMetrics {
259 total_connections: status.size as u32,
260 idle_connections: status.available as u32,
261 active_connections: (status.size - status.available) as u32,
262 waiting_requests: status.waiting as u32,
263 }
264 }
265
266 async fn execute_raw_query(
271 &self,
272 sql: &str,
273 ) -> Result<Vec<std::collections::HashMap<String, serde_json::Value>>> {
274 let client = self.acquire_connection_with_retry().await?;
276
277 let rows: Vec<Row> = client.query(sql, &[]).await.map_err(|e| FraiseQLError::Database {
278 message: format!("Query execution failed: {e}"),
279 sql_state: e.code().map(|c| c.code().to_string()),
280 })?;
281
282 let results: Vec<std::collections::HashMap<String, serde_json::Value>> =
284 rows.iter().map(row_to_map).collect();
285
286 Ok(results)
287 }
288
289 async fn execute_parameterized_aggregate(
290 &self,
291 sql: &str,
292 params: &[serde_json::Value],
293 ) -> Result<Vec<std::collections::HashMap<String, serde_json::Value>>> {
294 let typed: Vec<QueryParam> = params.iter().cloned().map(QueryParam::from).collect();
298 let param_refs = crate::types::as_sql_param_refs(&typed);
299
300 let client = self.acquire_connection_with_retry().await?;
301 let rows: Vec<Row> =
302 client.query(sql, ¶m_refs).await.map_err(|e| FraiseQLError::Database {
303 message: format!("Parameterized aggregate query failed: {e}"),
304 sql_state: e.code().map(|c| c.code().to_string()),
305 })?;
306
307 let results: Vec<std::collections::HashMap<String, serde_json::Value>> =
308 rows.iter().map(row_to_map).collect();
309
310 Ok(results)
311 }
312
313 async fn execute_function_call(
314 &self,
315 function_name: &str,
316 args: &[serde_json::Value],
317 ) -> Result<Vec<std::collections::HashMap<String, serde_json::Value>>> {
318 let quoted_fn = quote_postgres_identifier(function_name);
324 let placeholders: Vec<String> = (1..=args.len()).map(|i| format!("${i}")).collect();
325 let sql = format!("SELECT * FROM {quoted_fn}({})", placeholders.join(", "));
326
327 let mut client = self.acquire_connection_with_retry().await?;
328
329 let flex_args: Vec<FlexParam> = args
337 .iter()
338 .map(|v| match v {
339 serde_json::Value::Null => FlexParam::Null,
340 serde_json::Value::String(s) => FlexParam::Text(s.clone()),
341 _ => FlexParam::Text(v.to_string()),
342 })
343 .collect();
344 let params: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = flex_args
345 .iter()
346 .map(|v| v as &(dyn tokio_postgres::types::ToSql + Sync))
347 .collect();
348
349 if self.mutation_timing_enabled {
350 let txn =
354 client.build_transaction().start().await.map_err(|e| FraiseQLError::Database {
355 message: format!("Failed to start mutation timing transaction: {e}"),
356 sql_state: e.code().map(|c| c.code().to_string()),
357 })?;
358
359 txn.execute(
360 "SELECT set_config($1, clock_timestamp()::text, true)",
361 &[&self.timing_variable_name],
362 )
363 .await
364 .map_err(|e| FraiseQLError::Database {
365 message: format!("Failed to set mutation timing variable: {e}"),
366 sql_state: e.code().map(|c| c.code().to_string()),
367 })?;
368
369 let rows: Vec<Row> = txn.query(sql.as_str(), params.as_slice()).await.map_err(|e| {
370 let detail = e.as_db_error().map_or("", |d| d.message());
371 FraiseQLError::Database {
372 message: format!("Function call {function_name} failed: {e}: {detail}"),
373 sql_state: e.code().map(|c| c.code().to_string()),
374 }
375 })?;
376
377 txn.commit().await.map_err(|e| FraiseQLError::Database {
378 message: format!("Failed to commit mutation timing transaction: {e}"),
379 sql_state: e.code().map(|c| c.code().to_string()),
380 })?;
381
382 let results: Vec<std::collections::HashMap<String, serde_json::Value>> =
383 rows.iter().map(row_to_map).collect();
384
385 Ok(results)
386 } else {
387 let rows: Vec<Row> =
388 client.query(sql.as_str(), params.as_slice()).await.map_err(|e| {
389 let detail = e.as_db_error().map_or("", |d| d.message());
390 FraiseQLError::Database {
391 message: format!("Function call {function_name} failed: {e}: {detail}"),
392 sql_state: e.code().map(|c| c.code().to_string()),
393 }
394 })?;
395
396 let results: Vec<std::collections::HashMap<String, serde_json::Value>> =
397 rows.iter().map(row_to_map).collect();
398
399 Ok(results)
400 }
401 }
402
403 async fn set_session_variables(&self, variables: &[(&str, &str)]) -> Result<()> {
404 if variables.is_empty() {
405 return Ok(());
406 }
407 let client = self.acquire_connection_with_retry().await?;
408 for (name, value) in variables {
409 client
410 .execute("SELECT set_config($1, $2, true)", &[name, value])
411 .await
412 .map_err(|e| FraiseQLError::Database {
413 message: format!("set_config({name:?}) failed: {e}"),
414 sql_state: e.code().map(|c| c.code().to_string()),
415 })?;
416 }
417 Ok(())
418 }
419
420 async fn explain_query(
421 &self,
422 sql: &str,
423 _params: &[serde_json::Value],
424 ) -> Result<serde_json::Value> {
425 if sql.contains(';') {
429 return Err(FraiseQLError::Validation {
430 message: "EXPLAIN SQL must be a single statement".into(),
431 path: None,
432 });
433 }
434 let explain_sql = format!("EXPLAIN (ANALYZE false, FORMAT JSON) {sql}");
435 let client = self.acquire_connection_with_retry().await?;
436 let rows: Vec<Row> =
437 client
438 .query(explain_sql.as_str(), &[])
439 .await
440 .map_err(|e| FraiseQLError::Database {
441 message: format!("EXPLAIN failed: {e}"),
442 sql_state: e.code().map(|c| c.code().to_string()),
443 })?;
444
445 if let Some(row) = rows.first() {
446 let plan: serde_json::Value = row.try_get(0).map_err(|e| FraiseQLError::Database {
447 message: format!("Failed to parse EXPLAIN output: {e}"),
448 sql_state: None,
449 })?;
450 Ok(plan)
451 } else {
452 Ok(serde_json::Value::Null)
453 }
454 }
455
456 async fn query_stats(&self, limit: u32) -> Result<Vec<crate::types::QueryStatEntry>> {
457 self.pg_query_stats(limit).await
458 }
459
460 async fn query_stats_by_id(&self, id: &str) -> Result<Option<crate::types::QueryStatEntry>> {
461 self.pg_query_stats_by_id(id).await
462 }
463
464 async fn reset_query_stats(&self) -> Result<()> {
465 self.pg_reset_query_stats().await
466 }
467}
468
469impl SupportsMutations for PostgresAdapter {}