Skip to main content

fraiseql_db/postgres/adapter/
database.rs

1//! `DatabaseAdapter` and `SupportsMutations` implementations for `PostgresAdapter`.
2
3use async_trait::async_trait;
4use bytes::BufMut as _;
5use fraiseql_error::{FraiseQLError, Result};
6use tokio_postgres::Row;
7
8use super::{PostgresAdapter, build_where_select_sql, build_where_select_sql_ordered};
9use crate::{
10    identifier::quote_postgres_identifier,
11    traits::{DatabaseAdapter, SupportsMutations},
12    types::{
13        DatabaseType, JsonbValue, PoolMetrics, QueryParam,
14        sql_hints::{OrderByClause, SqlProjectionHint},
15    },
16    where_clause::WhereClause,
17};
18
19/// PostgreSQL SQLSTATE 42703: undefined column.
20#[allow(dead_code)]
21const PG_UNDEFINED_COLUMN: &str = "42703";
22
23/// A flexible SQL parameter that binds to any PostgreSQL type.
24///
25/// Solves the impedance mismatch between `serde_json::Value` (only accepts JSON/JSONB)
26/// and `Option<String>` (only accepts text-family types) when binding function-call
27/// arguments whose types are resolved at runtime from the function signature.
28///
29/// Serialisation strategy (binary wire format):
30/// - `JSONB`: 1-byte version header (1) + UTF-8 JSON bytes
31/// - `JSON`: UTF-8 JSON bytes
32/// - `UUID`: 16-byte big-endian UUID
33/// - `INT4`: 4-byte big-endian i32
34/// - `INT8`: 8-byte big-endian i64
35/// - `BOOL`: 1-byte (0 or 1)
36/// - All other types: UTF-8 bytes (PostgreSQL text binary = raw UTF-8)
37#[derive(Debug)]
38enum FlexParam {
39    /// SQL NULL — accepted by any PostgreSQL type.
40    Null,
41    /// A text-encoded value; binary-serialised according to the server-resolved type.
42    Text(String),
43}
44
45impl tokio_postgres::types::ToSql for FlexParam {
46    fn to_sql(
47        &self,
48        ty: &tokio_postgres::types::Type,
49        out: &mut bytes::BytesMut,
50    ) -> std::result::Result<tokio_postgres::types::IsNull, Box<dyn std::error::Error + Sync + Send>>
51    {
52        use tokio_postgres::types::{IsNull, Type};
53        match self {
54            Self::Null => Ok(IsNull::Yes),
55            Self::Text(s) => {
56                if *ty == Type::JSONB {
57                    // JSONB binary wire format: 1-byte version (1) + JSON bytes
58                    out.put_u8(1);
59                    out.extend_from_slice(s.as_bytes());
60                } else if *ty == Type::JSON {
61                    out.extend_from_slice(s.as_bytes());
62                } else if *ty == Type::UUID {
63                    let uuid = uuid::Uuid::parse_str(s)?;
64                    out.extend_from_slice(uuid.as_bytes());
65                } else if *ty == Type::INT4 {
66                    let n: i32 = s.parse()?;
67                    out.put_i32(n);
68                } else if *ty == Type::INT8 {
69                    let n: i64 = s.parse()?;
70                    out.put_i64(n);
71                } else if *ty == Type::BOOL {
72                    let b: bool = s.parse()?;
73                    out.put_u8(u8::from(b));
74                } else {
75                    // TEXT, VARCHAR, BPCHAR, NAME, UNKNOWN, and any user-defined type:
76                    // UTF-8 bytes are the binary wire representation for text-family types.
77                    out.extend_from_slice(s.as_bytes());
78                }
79                Ok(IsNull::No)
80            },
81        }
82    }
83
84    fn accepts(_ty: &tokio_postgres::types::Type) -> bool {
85        // Accepts all types; per-type serialisation is handled in `to_sql`.
86        true
87    }
88
89    fn to_sql_checked(
90        &self,
91        ty: &tokio_postgres::types::Type,
92        out: &mut bytes::BytesMut,
93    ) -> std::result::Result<tokio_postgres::types::IsNull, Box<dyn std::error::Error + Sync + Send>>
94    {
95        // `accepts()` returns true for all types, so the standard WrongType check is
96        // unnecessary.  Delegate directly to `to_sql`.
97        self.to_sql(ty, out)
98    }
99}
100
101/// Enrich a `FraiseQLError::Database` error for PostgreSQL SQLSTATE 42703 (undefined column)
102/// when the WHERE clause contains `NativeField` conditions.
103///
104/// Native columns may be inferred automatically at compile time from `ID`/`UUID`-typed
105/// arguments.  If the column does not exist on the target table at runtime, the raw
106/// PostgreSQL error is replaced with a diagnostic message that names the native columns
107/// involved and explains how to fix the schema.
108#[allow(dead_code)]
109fn enrich_undefined_column_error(
110    err: FraiseQLError,
111    view: &str,
112    where_clause: Option<&WhereClause>,
113) -> FraiseQLError {
114    let FraiseQLError::Database { ref sql_state, .. } = err else {
115        return err;
116    };
117    if sql_state.as_deref() != Some(PG_UNDEFINED_COLUMN) {
118        return err;
119    }
120    let native_cols: Vec<&str> =
121        where_clause.map(|wc| wc.native_column_names()).unwrap_or_default();
122    if native_cols.is_empty() {
123        return err;
124    }
125    FraiseQLError::Database {
126        message:   format!(
127            "Column(s) {:?} referenced as native column(s) on `{view}` do not exist. \
128             These columns were auto-inferred from ID/UUID-typed query arguments. \
129             Either add the column(s) to the table/view, or set \
130             `native_columns = {{}}` explicitly in your schema to disable inference.",
131            native_cols,
132        ),
133        sql_state: Some(PG_UNDEFINED_COLUMN.to_string()),
134    }
135}
136
137/// Decodes PostgreSQL ENUM columns as Rust `String` values.
138///
139/// `postgres-types 0.2.x` does not include `Kind::Enum` in `String::accepts`,
140/// so ENUM columns fall through all typed branches in `row_to_map` and land on
141/// `Null`.  This newtype accepts *only* ENUM types and decodes their wire bytes
142/// (always UTF-8 text in both text and binary protocols) as a plain String.
143struct EnumText(String);
144
145impl<'a> tokio_postgres::types::FromSql<'a> for EnumText {
146    fn from_sql(
147        _ty: &tokio_postgres::types::Type,
148        raw: &'a [u8],
149    ) -> std::result::Result<EnumText, Box<dyn std::error::Error + Sync + Send>> {
150        std::str::from_utf8(raw)
151            .map(|s| EnumText(s.to_owned()))
152            .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Sync + Send>)
153    }
154
155    fn accepts(ty: &tokio_postgres::types::Type) -> bool {
156        matches!(ty.kind(), tokio_postgres::types::Kind::Enum(_))
157    }
158}
159
160/// Convert a single `tokio_postgres::Row` into a `HashMap<String, serde_json::Value>`.
161///
162/// Tries each PostgreSQL type in priority order; falls back to `Null` for
163/// types that cannot be represented as JSON.
164///
165/// ## Supported Types
166///
167/// - **Numeric**: INT4 (i32), INT8 (i64), FLOAT8 (f64)
168/// - **Text**: TEXT, VARCHAR
169/// - **Boolean**: BOOL
170/// - **JSON**: JSONB, JSON
171/// - **Arrays**: TEXT[], VARCHAR[] (including NULL arrays)
172/// - **Enums**: PostgreSQL ENUM types (custom defined)
173///
174/// NULL values are represented as `serde_json::Value::Null` in all cases.
175///
176/// ## Known Gaps (Future Work)
177///
178/// The following PostgreSQL types currently fall through to `Null` and should be
179/// expanded in a future issue:
180///
181/// - Array types (INT4[], BOOL[], UUID[], FLOAT8[], etc.)
182/// - Temporal types (DATE, TIME, TIMESTAMP, TIMESTAMPTZ, INTERVAL)
183/// - UUID type
184/// - BYTEA
185/// - COMPOSITE types (besides mutation_response)
186/// - Range types (INT4RANGE, TSRANGE, etc.)
187/// - Network types (INET, CIDR, MACADDR, etc.)
188///
189/// See [fraiseql#XXX](https://github.com/getfraiseql/fraiseql/issues/XXX) for
190/// tracking expansion of type coverage.
191fn row_to_map(row: &Row) -> std::collections::HashMap<String, serde_json::Value> {
192    let mut map = std::collections::HashMap::new();
193    for (idx, column) in row.columns().iter().enumerate() {
194        let column_name = column.name().to_string();
195        let value: serde_json::Value = if let Ok(v) = row.try_get::<_, i32>(idx) {
196            serde_json::json!(v)
197        } else if let Ok(v) = row.try_get::<_, i64>(idx) {
198            serde_json::json!(v)
199        } else if let Ok(v) = row.try_get::<_, f64>(idx) {
200            serde_json::json!(v)
201        } else if let Ok(v) = row.try_get::<_, String>(idx) {
202            serde_json::json!(v)
203        } else if let Ok(v) = row.try_get::<_, bool>(idx) {
204            serde_json::json!(v)
205        } else if let Ok(v) = row.try_get::<_, serde_json::Value>(idx) {
206            v
207        } else if let Ok(v) = row.try_get::<_, Option<Vec<String>>>(idx) {
208            // Handle TEXT[] / VARCHAR[] columns (e.g. mutation_response.updated_fields).
209            // Must come after JSONB to avoid shadowing; TEXT[] is not a JSON type.
210            // Option wrapper lets NULL arrays return Ok(None) instead of Err,
211            // so they produce Value::Null intentionally rather than via the catch-all.
212            match v {
213                Some(arr) => serde_json::Value::Array(
214                    arr.into_iter().map(serde_json::Value::String).collect(),
215                ),
216                None => serde_json::Value::Null,
217            }
218        } else if let Ok(EnumText(v)) = row.try_get::<_, EnumText>(idx) {
219            // Handle PostgreSQL ENUM columns (e.g. mutation_response.error_class).
220            // postgres-types 0.2.x String::accepts() does not include Kind::Enum,
221            // so ENUMs fall through the String branch above.  ENUMs are always
222            // transmitted as UTF-8 text in both wire protocols.
223            serde_json::json!(v)
224        } else {
225            serde_json::Value::Null
226        };
227        map.insert(column_name, value);
228    }
229    map
230}
231
232// Reason: DatabaseAdapter is defined with #[async_trait]; all implementations must match
233// its transformed method signatures to satisfy the trait contract
234// async_trait: dyn-dispatch required; remove when RTN + Send is stable (RFC 3425)
235#[async_trait]
236impl DatabaseAdapter for PostgresAdapter {
237    async fn execute_with_projection(
238        &self,
239        view: &str,
240        projection: Option<&SqlProjectionHint>,
241        where_clause: Option<&WhereClause>,
242        limit: Option<u32>,
243        offset: Option<u32>,
244        order_by: Option<&[OrderByClause]>,
245        session_vars: &[(&str, &str)],
246    ) -> Result<Vec<JsonbValue>> {
247        self.execute_with_projection_impl(view, projection, where_clause, limit, offset, order_by, session_vars)
248            .await
249    }
250
251    async fn execute_where_query(
252        &self,
253        view: &str,
254        where_clause: Option<&WhereClause>,
255        limit: Option<u32>,
256        offset: Option<u32>,
257        order_by: Option<&[OrderByClause]>,
258        session_vars: &[(&str, &str)],
259    ) -> Result<Vec<JsonbValue>> {
260        let (sql, typed_params) =
261            build_where_select_sql_ordered(view, where_clause, limit, offset, order_by)?;
262
263        let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = typed_params
264            .iter()
265            .map(|p| p as &(dyn tokio_postgres::types::ToSql + Sync))
266            .collect();
267
268        let mut client = self.acquire_connection_with_retry().await?;
269
270        if !session_vars.is_empty() {
271            let txn = client.build_transaction().start().await.map_err(|e| FraiseQLError::Database {
272                message: format!("Failed to start transaction: {e}"),
273                sql_state: e.code().map(|c| c.code().to_string()),
274            })?;
275
276            // Set all session variables
277            for (name, value) in session_vars {
278                txn.execute("SELECT set_config($1, $2, true)", &[name, value]).await.map_err(|e| FraiseQLError::Database {
279                    message: format!("Failed to set session variable {name}: {e}"),
280                    sql_state: e.code().map(|c| c.code().to_string()),
281                })?;
282            }
283
284            // Execute query in same transaction
285            let rows = txn.query(&sql, &param_refs).await.map_err(|e| FraiseQLError::Database {
286                message: format!("Query execution failed: {e}"),
287                sql_state: e.code().map(|c| c.code().to_string()),
288            })?;
289            txn.commit().await.map_err(|e| FraiseQLError::Database {
290                message: format!("Failed to commit transaction: {e}"),
291                sql_state: e.code().map(|c| c.code().to_string()),
292            })?;
293
294            Ok(rows.iter().map(|row| {
295                let data: serde_json::Value = row.get(0);
296                JsonbValue::new(data)
297            }).collect())
298        } else {
299            // Fast path: no session vars, direct execution
300            let rows = client.query(&sql, &param_refs).await.map_err(|e| FraiseQLError::Database {
301                message: format!("Query execution failed: {e}"),
302                sql_state: e.code().map(|c| c.code().to_string()),
303            })?;
304            Ok(rows.iter().map(|row| {
305                let data: serde_json::Value = row.get(0);
306                JsonbValue::new(data)
307            }).collect())
308        }
309    }
310
311    async fn explain_where_query(
312        &self,
313        view: &str,
314        where_clause: Option<&WhereClause>,
315        limit: Option<u32>,
316        offset: Option<u32>,
317    ) -> Result<serde_json::Value> {
318        let (select_sql, typed_params) = build_where_select_sql(view, where_clause, limit, offset)?;
319        // Defense-in-depth: compiler-generated SQL should never contain a
320        // semicolon, but guard against it to prevent statement injection.
321        if select_sql.contains(';') {
322            return Err(FraiseQLError::Validation {
323                message: "EXPLAIN SQL must be a single statement".into(),
324                path:    None,
325            });
326        }
327        let explain_sql = format!("EXPLAIN (ANALYZE, BUFFERS, FORMAT JSON) {select_sql}");
328
329        let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = typed_params
330            .iter()
331            .map(|p| p as &(dyn tokio_postgres::types::ToSql + Sync))
332            .collect();
333
334        let client = self.acquire_connection_with_retry().await?;
335        let rows = client.query(explain_sql.as_str(), &param_refs).await.map_err(|e| {
336            FraiseQLError::Database {
337                message:   format!("EXPLAIN ANALYZE failed: {e}"),
338                sql_state: e.code().map(|c| c.code().to_string()),
339            }
340        })?;
341
342        if let Some(row) = rows.first() {
343            let plan: serde_json::Value = row.try_get(0).map_err(|e| FraiseQLError::Database {
344                message:   format!("Failed to parse EXPLAIN output: {e}"),
345                sql_state: None,
346            })?;
347            Ok(plan)
348        } else {
349            Ok(serde_json::Value::Null)
350        }
351    }
352
353    fn database_type(&self) -> DatabaseType {
354        DatabaseType::PostgreSQL
355    }
356
357    async fn health_check(&self) -> Result<()> {
358        // Use retry logic for health check to avoid false negatives during pool exhaustion
359        let client = self.acquire_connection_with_retry().await?;
360
361        client.query("SELECT 1", &[]).await.map_err(|e| FraiseQLError::Database {
362            message:   format!("Health check failed: {e}"),
363            sql_state: e.code().map(|c| c.code().to_string()),
364        })?;
365
366        Ok(())
367    }
368
369    #[allow(clippy::cast_possible_truncation)] // Reason: value is bounded; truncation cannot occur in practice
370    fn pool_metrics(&self) -> PoolMetrics {
371        let status = self.pool.status();
372
373        PoolMetrics {
374            total_connections:  status.size as u32,
375            idle_connections:   status.available as u32,
376            active_connections: (status.size - status.available) as u32,
377            waiting_requests:   status.waiting as u32,
378        }
379    }
380
381    /// # Security
382    ///
383    /// `sql` **must** be compiler-generated. Never pass user-supplied strings
384    /// directly — doing so would open SQL-injection vulnerabilities.
385    async fn execute_raw_query(
386        &self,
387        sql: &str,
388    ) -> Result<Vec<std::collections::HashMap<String, serde_json::Value>>> {
389        // Use retry logic for connection acquisition
390        let client = self.acquire_connection_with_retry().await?;
391
392        let rows: Vec<Row> = client.query(sql, &[]).await.map_err(|e| FraiseQLError::Database {
393            message:   format!("Query execution failed: {e}"),
394            sql_state: e.code().map(|c| c.code().to_string()),
395        })?;
396
397        // Convert each row to HashMap<String, Value>
398        let results: Vec<std::collections::HashMap<String, serde_json::Value>> =
399            rows.iter().map(row_to_map).collect();
400
401        Ok(results)
402    }
403
404    async fn execute_parameterized_aggregate(
405        &self,
406        sql: &str,
407        params: &[serde_json::Value],
408        session_vars: &[(&str, &str)],
409    ) -> Result<Vec<std::collections::HashMap<String, serde_json::Value>>> {
410        // Convert serde_json::Value params to QueryParam so that strings are bound
411        // as TEXT (not JSONB), which is required for correct WHERE comparisons against
412        // data->>'field' expressions that return TEXT.
413        let typed: Vec<QueryParam> = params.iter().cloned().map(QueryParam::from).collect();
414        let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> =
415            typed.iter().map(|p| p as &(dyn tokio_postgres::types::ToSql + Sync)).collect();
416
417        let mut client = self.acquire_connection_with_retry().await?;
418
419        if !session_vars.is_empty() {
420            let txn = client.build_transaction().start().await.map_err(|e| FraiseQLError::Database {
421                message: format!("Failed to start transaction: {e}"),
422                sql_state: e.code().map(|c| c.code().to_string()),
423            })?;
424
425            // Set all session variables
426            for (name, value) in session_vars {
427                txn.execute("SELECT set_config($1, $2, true)", &[name, value]).await.map_err(|e| FraiseQLError::Database {
428                    message: format!("Failed to set session variable {name}: {e}"),
429                    sql_state: e.code().map(|c| c.code().to_string()),
430                })?;
431            }
432
433            // Execute query in same transaction
434            let rows: Vec<Row> = txn.query(sql, &param_refs).await.map_err(|e| FraiseQLError::Database {
435                message: format!("Parameterized aggregate query failed: {e}"),
436                sql_state: e.code().map(|c| c.code().to_string()),
437            })?;
438            txn.commit().await.map_err(|e| FraiseQLError::Database {
439                message: format!("Failed to commit transaction: {e}"),
440                sql_state: e.code().map(|c| c.code().to_string()),
441            })?;
442
443            let results: Vec<std::collections::HashMap<String, serde_json::Value>> =
444                rows.iter().map(row_to_map).collect();
445
446            Ok(results)
447        } else {
448            // Fast path: no session vars, direct execution
449            let rows: Vec<Row> = client.query(sql, &param_refs).await.map_err(|e| FraiseQLError::Database {
450                message: format!("Parameterized aggregate query failed: {e}"),
451                sql_state: e.code().map(|c| c.code().to_string()),
452            })?;
453            let results: Vec<std::collections::HashMap<String, serde_json::Value>> =
454                rows.iter().map(row_to_map).collect();
455
456            Ok(results)
457        }
458    }
459
460    async fn execute_function_call(
461        &self,
462        function_name: &str,
463        args: &[serde_json::Value],
464        session_vars: &[(&str, &str)],
465    ) -> Result<Vec<std::collections::HashMap<String, serde_json::Value>>> {
466        // Build: SELECT * FROM "fn_name"($1, $2, ...)
467        // Use the standard identifier quoting utility so that schema-qualified
468        // names like "benchmark.fn_update_user" are correctly split into
469        // "benchmark"."fn_update_user" instead of being wrapped as a single
470        // identifier.
471        let quoted_fn = quote_postgres_identifier(function_name);
472        let placeholders: Vec<String> = (1..=args.len()).map(|i| format!("${i}")).collect();
473        let sql = format!("SELECT * FROM {quoted_fn}({})", placeholders.join(", "));
474
475        let mut client = self.acquire_connection_with_retry().await?;
476
477        // Convert serde_json::Value arguments to FlexParam for binding.
478        //
479        // serde_json::Value only accepts JSON/JSONB types; Option<String> only accepts
480        // text-family types.  Neither works universally when the function signature
481        // contains a mix of JSONB, UUID, INT4, and TEXT parameters.  FlexParam accepts
482        // all PostgreSQL types and serialises each value in the correct binary wire
483        // format for the server-resolved parameter type.
484        let flex_args: Vec<FlexParam> = args
485            .iter()
486            .map(|v| match v {
487                serde_json::Value::Null => FlexParam::Null,
488                serde_json::Value::String(s) => FlexParam::Text(s.clone()),
489                _ => FlexParam::Text(v.to_string()),
490            })
491            .collect();
492        let params: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = flex_args
493            .iter()
494            .map(|v| v as &(dyn tokio_postgres::types::ToSql + Sync))
495            .collect();
496
497        // Wrap in a transaction if there are session variables or timing is enabled.
498        // `set_config(name, value, is_local)` with is_local=true is equivalent to
499        // SET LOCAL and is parameterized to avoid SQL injection.
500        // Both session variables and the function call run in the same transaction
501        // on the same connection, so SET LOCAL is correctly scoped.
502        if !session_vars.is_empty() || self.mutation_timing_enabled {
503            let txn =
504                client.build_transaction().start().await.map_err(|e| FraiseQLError::Database {
505                    message:   format!("Failed to start mutation transaction: {e}"),
506                    sql_state: e.code().map(|c| c.code().to_string()),
507                })?;
508
509            // Set user-provided session variables first
510            for (name, value) in session_vars {
511                txn.execute("SELECT set_config($1, $2, true)", &[name, value])
512                    .await
513                    .map_err(|e| FraiseQLError::Database {
514                        message:   format!("Failed to set session variable {name}: {e}"),
515                        sql_state: e.code().map(|c| c.code().to_string()),
516                    })?;
517            }
518
519            // Then set the mutation timing variable if enabled
520            if self.mutation_timing_enabled {
521                txn.execute(
522                    "SELECT set_config($1, clock_timestamp()::text, true)",
523                    &[&self.timing_variable_name],
524                )
525                .await
526                .map_err(|e| FraiseQLError::Database {
527                    message:   format!("Failed to set mutation timing variable: {e}"),
528                    sql_state: e.code().map(|c| c.code().to_string()),
529                })?;
530            }
531
532            // Execute the function call within the same transaction
533            let rows: Vec<Row> = txn.query(sql.as_str(), params.as_slice()).await.map_err(|e| {
534                let detail = e.as_db_error().map_or("", |d| d.message());
535                FraiseQLError::Database {
536                    message:   format!("Function call {function_name} failed: {e}: {detail}"),
537                    sql_state: e.code().map(|c| c.code().to_string()),
538                }
539            })?;
540
541            txn.commit().await.map_err(|e| FraiseQLError::Database {
542                message:   format!("Failed to commit mutation transaction: {e}"),
543                sql_state: e.code().map(|c| c.code().to_string()),
544            })?;
545
546            let results: Vec<std::collections::HashMap<String, serde_json::Value>> =
547                rows.iter().map(row_to_map).collect();
548
549            Ok(results)
550        } else {
551            // Fast path: no session variables or timing — execute directly without transaction
552            let rows: Vec<Row> =
553                client.query(sql.as_str(), params.as_slice()).await.map_err(|e| {
554                    let detail = e.as_db_error().map_or("", |d| d.message());
555                    FraiseQLError::Database {
556                        message:   format!("Function call {function_name} failed: {e}: {detail}"),
557                        sql_state: e.code().map(|c| c.code().to_string()),
558                    }
559                })?;
560
561            let results: Vec<std::collections::HashMap<String, serde_json::Value>> =
562                rows.iter().map(row_to_map).collect();
563
564            Ok(results)
565        }
566    }
567
568    async fn set_session_variables(&self, variables: &[(&str, &str)]) -> Result<()> {
569        if variables.is_empty() {
570            return Ok(());
571        }
572        let client = self.acquire_connection_with_retry().await?;
573        for (name, value) in variables {
574            client
575                .execute("SELECT set_config($1, $2, true)", &[name, value])
576                .await
577                .map_err(|e| FraiseQLError::Database {
578                    message:   format!("set_config({name:?}) failed: {e}"),
579                    sql_state: e.code().map(|c| c.code().to_string()),
580                })?;
581        }
582        Ok(())
583    }
584
585    async fn explain_query(
586        &self,
587        sql: &str,
588        _params: &[serde_json::Value],
589    ) -> Result<serde_json::Value> {
590        // Defense-in-depth: reject multi-statement input even though this SQL is
591        // compiler-generated. A semicolon would allow a second statement to be
592        // appended to the EXPLAIN prefix.
593        if sql.contains(';') {
594            return Err(FraiseQLError::Validation {
595                message: "EXPLAIN SQL must be a single statement".into(),
596                path:    None,
597            });
598        }
599        let explain_sql = format!("EXPLAIN (ANALYZE false, FORMAT JSON) {sql}");
600        let client = self.acquire_connection_with_retry().await?;
601        let rows: Vec<Row> =
602            client
603                .query(explain_sql.as_str(), &[])
604                .await
605                .map_err(|e| FraiseQLError::Database {
606                    message:   format!("EXPLAIN failed: {e}"),
607                    sql_state: e.code().map(|c| c.code().to_string()),
608                })?;
609
610        if let Some(row) = rows.first() {
611            let plan: serde_json::Value = row.try_get(0).map_err(|e| FraiseQLError::Database {
612                message:   format!("Failed to parse EXPLAIN output: {e}"),
613                sql_state: None,
614            })?;
615            Ok(plan)
616        } else {
617            Ok(serde_json::Value::Null)
618        }
619    }
620}
621
622impl SupportsMutations for PostgresAdapter {}