Skip to main content

fraiseql_core/runtime/executor/
query.rs

1//! Regular and relay query execution.
2
3use std::sync::Arc;
4
5use super::{Executor, null_masked_fields, resolve_inject_value};
6use crate::{
7    db::{
8        CursorValue, ProjectionField, WhereClause, WhereOperator,
9        projection_generator::{FieldKind, PostgresProjectionGenerator},
10        traits::DatabaseAdapter,
11    },
12    error::{FraiseQLError, Result},
13    graphql::FieldSelection,
14    runtime::{JsonbStrategy, ResultProjector},
15    schema::{CompiledSchema, SqlProjectionHint},
16    security::{RlsWhereClause, SecurityContext},
17};
18
19/// Build a recursive [`ProjectionField`] tree from a GraphQL selection set.
20///
21/// For each field in `selections`, consults the compiled schema to determine
22/// whether the field is composite (Object) or scalar, and — for Object fields —
23/// recurses into the requested sub-fields to produce a nested
24/// `jsonb_build_object(...)` at the SQL level instead of returning the full blob.
25///
26/// List fields always fall back to `data->'field'` (full blob) because
27/// sub-projection inside aggregated JSONB arrays is out of scope.
28///
29/// Recursion is capped at 4 levels, matching `MAX_PROJECTION_DEPTH` in the
30/// projection generator.
31///
32/// Filter `__typename` from SQL projection fields.
33/// `__typename` is a GraphQL meta-field not stored in JSONB.
34/// The `ResultProjector` handles injection — see `projection.rs`.
35/// Removing this filter causes `data->>'__typename'` (NULL) to overwrite
36/// the value injected by `with_typename()`, depending on field iteration order.
37fn build_typed_projection_fields(
38    selections: &[FieldSelection],
39    schema: &CompiledSchema,
40    parent_type_name: &str,
41    depth: usize,
42) -> Vec<ProjectionField> {
43    const MAX_DEPTH: usize = 4;
44
45    let type_def = schema.find_type(parent_type_name);
46    selections
47        .iter()
48        // Skip __typename — it is a GraphQL meta-field not stored in the JSONB column.
49        // Including it would generate `data->>'__typename'` (always NULL) in the SQL
50        // projection and then overwrite the value already injected by `with_typename`.
51        .filter(|sel| sel.name != "__typename")
52        .map(|sel| {
53            let field_def =
54                type_def.and_then(|td| td.fields.iter().find(|f| f.name == sel.name.as_str()));
55
56            let is_composite = field_def.is_some_and(|fd| !fd.field_type.is_scalar());
57            let is_list = field_def.is_some_and(|fd| fd.field_type.is_list());
58            let is_text = field_def.is_some_and(|fd| {
59                matches!(
60                    fd.field_type,
61                    crate::schema::FieldType::String | crate::schema::FieldType::Id
62                )
63            });
64
65            let kind = if is_composite {
66                FieldKind::Composite
67            } else if is_text {
68                FieldKind::Text
69            } else {
70                FieldKind::Native
71            };
72
73            // Recurse into Object types only — List fields fall back to full blob
74            let sub_fields =
75                if is_composite && !is_list && !sel.nested_fields.is_empty() && depth < MAX_DEPTH {
76                    let child_type =
77                        field_def.and_then(|fd| fd.field_type.type_name()).unwrap_or("");
78                    if child_type.is_empty() {
79                        None
80                    } else {
81                        Some(build_typed_projection_fields(
82                            &sel.nested_fields,
83                            schema,
84                            child_type,
85                            depth + 1,
86                        ))
87                    }
88                } else {
89                    None
90                };
91
92            ProjectionField {
93                name: sel.response_key().to_string(),
94                kind,
95                sub_fields,
96            }
97        })
98        .collect()
99}
100
101/// Map a schema [`FieldType`] to the ORDER BY cast hint.
102///
103/// Returns [`OrderByFieldType::Text`] for types that sort correctly as text
104/// (strings, UUIDs, enums) or for composite/container types where a cast
105/// would be meaningless.
106const fn field_type_to_order_by_type(ft: &crate::schema::FieldType) -> crate::db::OrderByFieldType {
107    use crate::{db::OrderByFieldType as OB, schema::FieldType as FT};
108    match ft {
109        FT::Int => OB::Integer,
110        FT::Float | FT::Decimal => OB::Numeric,
111        FT::Boolean => OB::Boolean,
112        FT::DateTime => OB::DateTime,
113        FT::Date => OB::Date,
114        FT::Time => OB::Time,
115        // String, ID, UUID, Json, Enum, Scalar, and container types sort as text.
116        _ => OB::Text,
117    }
118}
119
120/// Enrich parsed `OrderByClause` values with schema-derived type information
121/// and native column mappings.
122///
123/// For each clause, looks up the field in the compiled schema's type definition
124/// to determine the correct `OrderByFieldType` (so the SQL generator emits a
125/// typed cast), and checks `native_columns` for a direct column mapping (so the
126/// SQL generator can bypass JSONB extraction entirely).
127fn enrich_order_by_clauses(
128    mut clauses: Vec<crate::db::OrderByClause>,
129    schema: &CompiledSchema,
130    return_type: &str,
131    native_columns: &std::collections::HashMap<String, String>,
132) -> Vec<crate::db::OrderByClause> {
133    let type_def = schema.find_type(return_type);
134    for clause in &mut clauses {
135        // Look up the field type from the schema definition.
136        if let Some(td) = type_def {
137            if let Some(field_def) = td.find_field(&clause.field) {
138                clause.field_type = field_type_to_order_by_type(&field_def.field_type);
139            }
140        }
141
142        // Check if the query definition has a native column mapping for this field.
143        // `native_columns` keys are the GraphQL argument names (camelCase).
144        let storage_key = clause.storage_key();
145        if native_columns.contains_key(&storage_key) {
146            clause.native_column = Some(storage_key);
147        }
148    }
149    clauses
150}
151
152impl<A: DatabaseAdapter> Executor<A> {
153    /// Execute a regular query with row-level security (RLS) filtering.
154    ///
155    /// This method:
156    /// 1. Validates the user's security context (token expiration, etc.)
157    /// 2. Evaluates RLS policies to determine what rows the user can access
158    /// 3. Composes RLS filters with user-provided WHERE clauses
159    /// 4. Passes the composed filter to the database adapter for SQL-level filtering
160    ///
161    /// RLS filtering happens at the database level, not in Rust, ensuring:
162    /// - High performance (database can optimize filters)
163    /// - Correct handling of pagination (LIMIT applied after RLS filtering)
164    /// - Type-safe composition via `WhereClause` enum
165    ///
166    /// # Errors
167    ///
168    /// * [`FraiseQLError::Validation`] — security token expired, role check failed, or query not
169    ///   found in schema.
170    /// * [`FraiseQLError::Database`] — the database adapter returned an error.
171    pub(super) async fn execute_regular_query_with_security(
172        &self,
173        query: &str,
174        variables: Option<&serde_json::Value>,
175        security_context: &SecurityContext,
176    ) -> Result<serde_json::Value> {
177        // 1. Validate security context (check expiration, etc.)
178        if security_context.is_expired() {
179            return Err(FraiseQLError::Validation {
180                message: "Security token has expired".to_string(),
181                path:    Some("request.authorization".to_string()),
182            });
183        }
184
185        // 2. Match query to compiled template
186        let query_match = self.matcher.match_query(query, variables)?;
187
188        // 2b. Enforce requires_role — return "not found" (not "forbidden") to prevent enumeration
189        if let Some(ref required_role) = query_match.query_def.requires_role {
190            if !security_context.roles.iter().any(|r| r == required_role) {
191                return Err(FraiseQLError::Validation {
192                    message: format!("Query '{}' not found in schema", query_match.query_def.name),
193                    path:    None,
194                });
195            }
196        }
197
198        // Inject session variables (transaction-scoped set_config) when configured.
199        //
200        // Must run before any DB execution (including the relay branch below) so that
201        // PostgreSQL-native Row Level Security policies that rely on `current_setting()`
202        // values (e.g. `app.tenant_id`) are effective for read queries, matching the
203        // behaviour already in place for mutations.
204        {
205            let sv = &self.schema.session_variables;
206            if !sv.variables.is_empty() || sv.inject_started_at {
207                let vars = crate::runtime::executor::security::resolve_session_variables(
208                    sv,
209                    security_context,
210                );
211                if !vars.is_empty() {
212                    let pairs: Vec<(&str, &str)> =
213                        vars.iter().map(|(k, v)| (k.as_str(), v.as_str())).collect();
214                    self.adapter.set_session_variables(&pairs).await?;
215                }
216            }
217        }
218
219        // Route relay queries to dedicated handler with security context.
220        if query_match.query_def.relay {
221            return self.execute_relay_query(&query_match, variables, Some(security_context)).await;
222        }
223
224        // 0. Check response cache (skips all projection/RBAC/serialization work on hit)
225        let response_cache_key =
226            self.response_cache.as_ref().filter(|rc| rc.is_enabled()).map(|_| {
227                let query_key = Self::compute_response_cache_key(&query_match);
228                let sec_hash =
229                    crate::cache::response_cache::hash_security_context(Some(security_context));
230                (query_key, sec_hash)
231            });
232
233        if let (Some((query_key, sec_hash)), Some(rc)) =
234            (response_cache_key, self.response_cache.as_ref())
235        {
236            if let Some(cached) = rc.get(query_key, sec_hash)? {
237                return Ok((*cached).clone());
238            }
239        }
240
241        // 3. Create execution plan
242        let plan = self.planner.plan(&query_match)?;
243
244        // 4. Evaluate RLS policy and build WHERE clause filter. The return type is
245        //    Option<RlsWhereClause> — a compile-time proof that the clause passed through RLS
246        //    evaluation.
247        let rls_where_clause: Option<RlsWhereClause> =
248            if let Some(ref rls_policy) = self.config.rls_policy {
249                // Evaluate RLS policy with user's security context
250                rls_policy.evaluate(security_context, &query_match.query_def.name)?
251            } else {
252                // No RLS policy configured, allow all access
253                None
254            };
255
256        // 5. Get SQL source from query definition
257        let sql_source =
258            query_match
259                .query_def
260                .sql_source
261                .as_ref()
262                .ok_or_else(|| FraiseQLError::Validation {
263                    message: "Query has no SQL source".to_string(),
264                    path:    None,
265                })?;
266
267        // 6. Generate SQL projection hint for requested fields (optimization). Build a recursive
268        //    ProjectionField tree from the selection set so that composite sub-fields are projected
269        //    with nested jsonb_build_object instead of returning the full blob.
270        let projection_hint = if !plan.projection_fields.is_empty()
271            && plan.jsonb_strategy == JsonbStrategy::Project
272        {
273            let root_fields = query_match
274                .selections
275                .first()
276                .map_or(&[] as &[_], |s| s.nested_fields.as_slice());
277            let typed_fields = build_typed_projection_fields(
278                root_fields,
279                &self.schema,
280                &query_match.query_def.return_type,
281                0,
282            );
283
284            let generator = PostgresProjectionGenerator::new();
285            let projection_sql = generator
286                .generate_typed_projection_sql(&typed_fields)
287                .unwrap_or_else(|_| "data".to_string());
288
289            Some(SqlProjectionHint {
290                database:                    self.adapter.database_type(),
291                projection_template:         projection_sql,
292                estimated_reduction_percent: compute_projection_reduction(
293                    plan.projection_fields.len(),
294                ),
295            })
296        } else {
297            // Stream strategy: return full JSONB, no projection hint
298            None
299        };
300
301        // 7. AND inject conditions onto the RLS WHERE clause. Inject conditions always come after
302        //    RLS so they cannot bypass it.
303        let combined_where: Option<WhereClause> = if query_match.query_def.inject_params.is_empty()
304        {
305            // Common path: unwrap RlsWhereClause into WhereClause for the adapter
306            rls_where_clause.map(RlsWhereClause::into_where_clause)
307        } else {
308            let mut conditions: Vec<WhereClause> = query_match
309                .query_def
310                .inject_params
311                .iter()
312                .map(|(col, source)| {
313                    let value = resolve_inject_value(col, source, security_context)?;
314                    Ok(inject_param_where_clause(col, value, &query_match.query_def.native_columns))
315                })
316                .collect::<Result<Vec<_>>>()?;
317
318            if let Some(rls) = rls_where_clause {
319                conditions.insert(0, rls.into_where_clause());
320            }
321            match conditions.len() {
322                0 => None,
323                1 => Some(conditions.remove(0)),
324                _ => Some(WhereClause::And(conditions)),
325            }
326        };
327
328        // 5b. Compose user-supplied WHERE from GraphQL arguments when has_where is enabled.
329        //     Security conditions (RLS + inject) are always first so they cannot be bypassed.
330        let combined_where: Option<WhereClause> = if query_match.query_def.auto_params.has_where {
331            let user_where = query_match
332                .arguments
333                .get("where")
334                .map(WhereClause::from_graphql_json)
335                .transpose()?;
336            match (combined_where, user_where) {
337                (None, None) => None,
338                (Some(sec), None) => Some(sec),
339                (None, Some(user)) => Some(user),
340                (Some(sec), Some(user)) => Some(WhereClause::And(vec![sec, user])),
341            }
342        } else {
343            combined_where
344        };
345
346        // 5c. Convert explicit query arguments (e.g. id, slug) to WHERE conditions.
347        //     This handles single-entity lookups like `user(id: "...")` where the
348        //     arguments are direct equality filters, not the structured `where` argument.
349        let combined_where = combine_explicit_arg_where(
350            combined_where,
351            &query_match.query_def.arguments,
352            &query_match.arguments,
353            &query_match.query_def.native_columns,
354        );
355
356        // 8. Extract limit/offset from query arguments when auto_params are enabled
357        let limit = if query_match.query_def.auto_params.has_limit {
358            query_match
359                .arguments
360                .get("limit")
361                .and_then(|v| v.as_u64())
362                .and_then(|v| u32::try_from(v).ok())
363        } else {
364            None
365        };
366
367        let offset = if query_match.query_def.auto_params.has_offset {
368            query_match
369                .arguments
370                .get("offset")
371                .and_then(|v| v.as_u64())
372                .and_then(|v| u32::try_from(v).ok())
373        } else {
374            None
375        };
376
377        // 8b. Extract order_by from query arguments when has_order_by is enabled,
378        //     then enrich each clause with the schema field type so the SQL generator
379        //     emits correct type casts (e.g., `(data->>'amount')::numeric`).
380        let order_by_clauses = if query_match.query_def.auto_params.has_order_by {
381            query_match
382                .arguments
383                .get("orderBy")
384                .map(crate::db::OrderByClause::from_graphql_json)
385                .transpose()?
386                .map(|clauses| {
387                    enrich_order_by_clauses(
388                        clauses,
389                        &self.schema,
390                        &query_match.query_def.return_type,
391                        &query_match.query_def.native_columns,
392                    )
393                })
394        } else {
395            None
396        };
397
398        // 9. Execute query with combined WHERE clause filter
399        let results = self
400            .adapter
401            .execute_with_projection_arc(
402                sql_source,
403                projection_hint.as_ref(),
404                combined_where.as_ref(),
405                limit,
406                offset,
407                order_by_clauses.as_deref(),
408            )
409            .await?;
410
411        // 10. Apply field-level RBAC filtering (reject / mask / allow)
412        let access = self.apply_field_rbac_filtering(
413            &query_match.query_def.return_type,
414            plan.projection_fields,
415            security_context,
416        )?;
417
418        // 11. Project results — include both allowed and masked fields in projection
419        let mut all_projection_fields = access.allowed;
420        all_projection_fields.extend(access.masked.iter().cloned());
421        let projector = ResultProjector::new(all_projection_fields)
422            .configure_typename_from_selections(
423                &query_match.selections,
424                &query_match.query_def.return_type,
425            );
426        let mut projected =
427            projector.project_results(&results, query_match.query_def.returns_list)?;
428
429        // 11. Null out masked fields in the projected result
430        if !access.masked.is_empty() {
431            null_masked_fields(&mut projected, &access.masked);
432        }
433
434        // 12. Wrap in GraphQL data envelope
435        let response =
436            ResultProjector::wrap_in_data_envelope(projected, &query_match.query_def.name);
437
438        // 13. Store in response cache (if enabled) and return value
439        if let (Some((query_key, sec_hash)), Some(rc)) =
440            (response_cache_key, self.response_cache.as_ref())
441        {
442            let sql_source = query_match.query_def.sql_source.as_deref().unwrap_or("");
443            let _ = rc.put(
444                query_key,
445                sec_hash,
446                Arc::new(response.clone()),
447                vec![sql_source.to_string()],
448            );
449        }
450
451        Ok(response)
452    }
453
454    /// Compute a response cache key from a query match.
455    ///
456    /// Hashes the query name, matched fields, and arguments to produce
457    /// a u64 key. Combined with the security context hash, this forms
458    /// the full response cache key.
459    fn compute_response_cache_key(query_match: &crate::runtime::matcher::QueryMatch) -> u64 {
460        use std::hash::{Hash, Hasher};
461        let mut hasher = ahash::AHasher::default();
462        query_match.query_def.name.hash(&mut hasher);
463        for field in &query_match.fields {
464            field.hash(&mut hasher);
465        }
466        // Hash arguments (sorted keys for determinism)
467        let mut keys: Vec<&String> = query_match.arguments.keys().collect();
468        keys.sort();
469        for key in keys {
470            key.hash(&mut hasher);
471            serde_json::to_string(&query_match.arguments[key])
472                .unwrap_or_default()
473                .hash(&mut hasher);
474        }
475        hasher.finish()
476    }
477
478    /// Execute a regular (non-aggregate, non-relay) GraphQL query.
479    ///
480    /// # Errors
481    ///
482    /// Returns [`FraiseQLError::Validation`] if the query does not match a compiled
483    /// template or requires a security context that is not present.
484    /// Returns [`FraiseQLError::Database`] if the SQL execution or result projection fails.
485    pub(super) async fn execute_regular_query(
486        &self,
487        query: &str,
488        variables: Option<&serde_json::Value>,
489    ) -> Result<serde_json::Value> {
490        // 1. Match query to compiled template
491        let query_match = self.matcher.match_query(query, variables)?;
492
493        // Guard: role-restricted queries are invisible to unauthenticated users
494        if query_match.query_def.requires_role.is_some() {
495            return Err(FraiseQLError::Validation {
496                message: format!("Query '{}' not found in schema", query_match.query_def.name),
497                path:    None,
498            });
499        }
500
501        // Guard: queries with inject params require a security context.
502        if !query_match.query_def.inject_params.is_empty() {
503            return Err(FraiseQLError::Validation {
504                message: format!(
505                    "Query '{}' has inject params but was called without a security context",
506                    query_match.query_def.name
507                ),
508                path:    None,
509            });
510        }
511
512        // Route relay queries to dedicated handler.
513        if query_match.query_def.relay {
514            return self.execute_relay_query(&query_match, variables, None).await;
515        }
516
517        // 2. Create execution plan
518        let plan = self.planner.plan(&query_match)?;
519
520        // 3. Execute SQL query
521        let sql_source = query_match.query_def.sql_source.as_ref().ok_or_else(|| {
522            crate::error::FraiseQLError::Validation {
523                message: "Query has no SQL source".to_string(),
524                path:    None,
525            }
526        })?;
527
528        // 3a. Generate SQL projection hint for requested fields (optimization).
529        //     Recursive typed projection: composite sub-fields are projected with nested
530        //     jsonb_build_object instead of returning the full blob.
531        let projection_hint = if !plan.projection_fields.is_empty()
532            && plan.jsonb_strategy == JsonbStrategy::Project
533        {
534            let root_fields = query_match
535                .selections
536                .first()
537                .map_or(&[] as &[_], |s| s.nested_fields.as_slice());
538            let typed_fields = build_typed_projection_fields(
539                root_fields,
540                &self.schema,
541                &query_match.query_def.return_type,
542                0,
543            );
544            let generator = PostgresProjectionGenerator::new();
545            let projection_sql = generator
546                .generate_typed_projection_sql(&typed_fields)
547                .unwrap_or_else(|_| "data".to_string());
548
549            Some(SqlProjectionHint {
550                database:                    self.adapter.database_type(),
551                projection_template:         projection_sql,
552                estimated_reduction_percent: compute_projection_reduction(
553                    plan.projection_fields.len(),
554                ),
555            })
556        } else {
557            // Stream strategy: return full JSONB, no projection hint
558            None
559        };
560
561        // 3b. Extract auto_params (limit, offset, where, order_by) from arguments
562        let user_where: Option<WhereClause> = if query_match.query_def.auto_params.has_where {
563            query_match
564                .arguments
565                .get("where")
566                .map(WhereClause::from_graphql_json)
567                .transpose()?
568        } else {
569            None
570        };
571
572        // 3c. Convert explicit query arguments (e.g. id, slug) to WHERE conditions.
573        let user_where = combine_explicit_arg_where(
574            user_where,
575            &query_match.query_def.arguments,
576            &query_match.arguments,
577            &query_match.query_def.native_columns,
578        );
579
580        let limit = if query_match.query_def.auto_params.has_limit {
581            query_match
582                .arguments
583                .get("limit")
584                .and_then(|v| v.as_u64())
585                .and_then(|v| u32::try_from(v).ok())
586        } else {
587            None
588        };
589
590        let offset = if query_match.query_def.auto_params.has_offset {
591            query_match
592                .arguments
593                .get("offset")
594                .and_then(|v| v.as_u64())
595                .and_then(|v| u32::try_from(v).ok())
596        } else {
597            None
598        };
599
600        let order_by_clauses = if query_match.query_def.auto_params.has_order_by {
601            query_match
602                .arguments
603                .get("orderBy")
604                .map(crate::db::OrderByClause::from_graphql_json)
605                .transpose()?
606                .map(|clauses| {
607                    enrich_order_by_clauses(
608                        clauses,
609                        &self.schema,
610                        &query_match.query_def.return_type,
611                        &query_match.query_def.native_columns,
612                    )
613                })
614        } else {
615            None
616        };
617
618        let results = self
619            .adapter
620            .execute_with_projection_arc(
621                sql_source,
622                projection_hint.as_ref(),
623                user_where.as_ref(),
624                limit,
625                offset,
626                order_by_clauses.as_deref(),
627            )
628            .await?;
629
630        // 4. Project results
631        let projector = ResultProjector::new(plan.projection_fields)
632            .configure_typename_from_selections(
633                &query_match.selections,
634                &query_match.query_def.return_type,
635            );
636        let projected = projector.project_results(&results, query_match.query_def.returns_list)?;
637
638        // 5. Wrap in GraphQL data envelope
639        let response =
640            ResultProjector::wrap_in_data_envelope(projected, &query_match.query_def.name);
641
642        // 6. Serialize to JSON string
643        Ok(response)
644    }
645
646    /// Execute a pre-built `QueryMatch` directly, bypassing GraphQL string parsing.
647    ///
648    /// Used by the REST transport for embedded sub-queries and NDJSON streaming
649    /// where the query parameters are already resolved from HTTP request parameters.
650    ///
651    /// # Errors
652    ///
653    /// Returns `FraiseQLError::Validation` if the query has no SQL source.
654    /// Returns `FraiseQLError::Database` if the adapter returns an error.
655    pub async fn execute_query_direct(
656        &self,
657        query_match: &crate::runtime::matcher::QueryMatch,
658        _variables: Option<&serde_json::Value>,
659        security_context: Option<&SecurityContext>,
660    ) -> Result<serde_json::Value> {
661        // Evaluate RLS policy if present.
662        let rls_where_clause: Option<RlsWhereClause> = if let (Some(ref rls_policy), Some(ctx)) =
663            (&self.config.rls_policy, security_context)
664        {
665            rls_policy.evaluate(ctx, &query_match.query_def.name)?
666        } else {
667            None
668        };
669
670        // Get SQL source.
671        let sql_source =
672            query_match
673                .query_def
674                .sql_source
675                .as_ref()
676                .ok_or_else(|| FraiseQLError::Validation {
677                    message: "Query has no SQL source".to_string(),
678                    path:    None,
679                })?;
680
681        // Build execution plan.
682        let plan = self.planner.plan(query_match)?;
683
684        // Extract auto_params from arguments.
685        let user_where: Option<WhereClause> = if query_match.query_def.auto_params.has_where {
686            query_match
687                .arguments
688                .get("where")
689                .map(WhereClause::from_graphql_json)
690                .transpose()?
691        } else {
692            None
693        };
694
695        let limit = query_match
696            .arguments
697            .get("limit")
698            .and_then(|v| v.as_u64())
699            .and_then(|v| u32::try_from(v).ok());
700
701        let offset = query_match
702            .arguments
703            .get("offset")
704            .and_then(|v| v.as_u64())
705            .and_then(|v| u32::try_from(v).ok());
706
707        let order_by_clauses = query_match
708            .arguments
709            .get("orderBy")
710            .map(crate::db::OrderByClause::from_graphql_json)
711            .transpose()?
712            .map(|clauses| {
713                enrich_order_by_clauses(
714                    clauses,
715                    &self.schema,
716                    &query_match.query_def.return_type,
717                    &query_match.query_def.native_columns,
718                )
719            });
720
721        // Convert explicit arguments to WHERE conditions.
722        let user_where = combine_explicit_arg_where(
723            user_where,
724            &query_match.query_def.arguments,
725            &query_match.arguments,
726            &query_match.query_def.native_columns,
727        );
728
729        // Compose RLS and user WHERE clauses.
730        let composed_where = match (&rls_where_clause, &user_where) {
731            (Some(rls), Some(user)) => {
732                Some(WhereClause::And(vec![rls.as_where_clause().clone(), user.clone()]))
733            },
734            (Some(rls), None) => Some(rls.as_where_clause().clone()),
735            (None, Some(user)) => Some(user.clone()),
736            (None, None) => None,
737        };
738
739        // Inject security-derived params.
740        if !query_match.query_def.inject_params.is_empty() {
741            if let Some(ctx) = security_context {
742                for (param_name, source) in &query_match.query_def.inject_params {
743                    let _value = resolve_inject_value(param_name, source, ctx)?;
744                    // Injected params are applied at the SQL level via WHERE clauses,
745                    // not via GraphQL variables, so no mutation of variables is needed here.
746                }
747            }
748        }
749
750        // Execute.
751        let results = self
752            .adapter
753            .execute_with_projection_arc(
754                sql_source,
755                None,
756                composed_where.as_ref(),
757                limit,
758                offset,
759                order_by_clauses.as_deref(),
760            )
761            .await?;
762
763        // Project results.
764        let projector = ResultProjector::new(plan.projection_fields)
765            .configure_typename_from_selections(
766                &query_match.selections,
767                &query_match.query_def.return_type,
768            );
769        let projected = projector.project_results(&results, query_match.query_def.returns_list)?;
770
771        // Wrap in GraphQL data envelope.
772        let response =
773            ResultProjector::wrap_in_data_envelope(projected, &query_match.query_def.name);
774
775        Ok(response)
776    }
777
778    /// Execute a mutation with security context for REST transport.
779    ///
780    /// Delegates to the standard mutation execution path with RLS enforcement.
781    ///
782    /// # Errors
783    ///
784    /// Returns `FraiseQLError::Database` if the adapter returns an error.
785    /// Returns `FraiseQLError::Validation` if inject params require a missing security context.
786    pub async fn execute_mutation_with_security(
787        &self,
788        mutation_name: &str,
789        arguments: &serde_json::Value,
790        security_context: Option<&crate::security::SecurityContext>,
791    ) -> crate::error::Result<serde_json::Value> {
792        // Build a synthetic GraphQL mutation query and delegate to execute()
793        let args_str = if let Some(obj) = arguments.as_object() {
794            obj.iter().map(|(k, v)| format!("{k}: {v}")).collect::<Vec<_>>().join(", ")
795        } else {
796            String::new()
797        };
798        let query = if args_str.is_empty() {
799            format!("mutation {{ {mutation_name} {{ status entity_id message }} }}")
800        } else {
801            format!("mutation {{ {mutation_name}({args_str}) {{ status entity_id message }} }}")
802        };
803
804        if let Some(ctx) = security_context {
805            self.execute_with_security(&query, None, ctx).await
806        } else {
807            self.execute(&query, None).await
808        }
809    }
810
811    /// Execute a batch of mutations (for REST bulk insert).
812    ///
813    /// Executes each mutation individually and collects results into a `BulkResult`.
814    ///
815    /// # Errors
816    ///
817    /// Returns the first error encountered during batch execution.
818    pub async fn execute_mutation_batch(
819        &self,
820        mutation_name: &str,
821        items: &[serde_json::Value],
822        security_context: Option<&crate::security::SecurityContext>,
823    ) -> crate::error::Result<crate::runtime::BulkResult> {
824        let mut entities = Vec::with_capacity(items.len());
825        for item in items {
826            let result = self
827                .execute_mutation_with_security(mutation_name, item, security_context)
828                .await?;
829            entities.push(result);
830        }
831        Ok(crate::runtime::BulkResult {
832            affected_rows: entities.len() as u64,
833            entities:      Some(entities),
834        })
835    }
836
837    /// Execute a bulk operation (collection-level PATCH/DELETE) by filter.
838    ///
839    /// # Errors
840    ///
841    /// Returns `FraiseQLError::Database` if the adapter returns an error.
842    pub async fn execute_bulk_by_filter(
843        &self,
844        query_match: &crate::runtime::matcher::QueryMatch,
845        mutation_name: &str,
846        body: Option<&serde_json::Value>,
847        _id_field: &str,
848        _max_affected: u64,
849        security_context: Option<&SecurityContext>,
850    ) -> crate::error::Result<crate::runtime::BulkResult> {
851        // Execute the filter query to find matching rows.
852        let filter_result = self.execute_query_direct(query_match, None, security_context).await?;
853
854        let args = body.cloned().unwrap_or(serde_json::json!({}));
855        let result = self
856            .execute_mutation_with_security(mutation_name, &args, security_context)
857            .await?;
858
859        let count = filter_result
860            .get("data")
861            .and_then(|d| d.as_object())
862            .and_then(|o| o.values().next())
863            .and_then(|v| v.as_array())
864            .map_or(1, |a| a.len() as u64);
865
866        Ok(crate::runtime::BulkResult {
867            affected_rows: count,
868            entities:      Some(vec![result]),
869        })
870    }
871
872    /// Count the total number of rows matching the query's WHERE and RLS conditions.
873    ///
874    /// Issues a `SELECT COUNT(*) FROM {view} WHERE {conditions}` query, ignoring
875    /// pagination (ORDER BY, LIMIT, OFFSET). Useful for REST `X-Total-Count` headers
876    /// and `count=exact` query parameter support.
877    ///
878    /// # Arguments
879    ///
880    /// * `query_match` - Pre-built query match identifying the SQL source and filters
881    /// * `variables` - Optional variables (unused for count, reserved for future use)
882    /// * `security_context` - Optional authenticated user context for RLS and inject
883    ///
884    /// # Errors
885    ///
886    /// Returns `FraiseQLError::Validation` if the query has no SQL source, or if
887    /// inject params are required but no security context is provided.
888    /// Returns `FraiseQLError::Database` if the adapter returns an error.
889    pub async fn count_rows(
890        &self,
891        query_match: &crate::runtime::matcher::QueryMatch,
892        _variables: Option<&serde_json::Value>,
893        security_context: Option<&SecurityContext>,
894    ) -> Result<u64> {
895        // 1. Evaluate RLS policy
896        let rls_where_clause: Option<RlsWhereClause> = if let (Some(ref rls_policy), Some(ctx)) =
897            (&self.config.rls_policy, security_context)
898        {
899            rls_policy.evaluate(ctx, &query_match.query_def.name)?
900        } else {
901            None
902        };
903
904        // 2. Get SQL source
905        let sql_source =
906            query_match
907                .query_def
908                .sql_source
909                .as_ref()
910                .ok_or_else(|| FraiseQLError::Validation {
911                    message: "Query has no SQL source".to_string(),
912                    path:    None,
913                })?;
914
915        // 3. Build combined WHERE clause (RLS + inject)
916        let combined_where: Option<WhereClause> = if query_match.query_def.inject_params.is_empty()
917        {
918            rls_where_clause.map(RlsWhereClause::into_where_clause)
919        } else {
920            let ctx = security_context.ok_or_else(|| FraiseQLError::Validation {
921                message: format!(
922                    "Query '{}' has inject params but no security context is available",
923                    query_match.query_def.name
924                ),
925                path:    None,
926            })?;
927            let mut conditions: Vec<WhereClause> = query_match
928                .query_def
929                .inject_params
930                .iter()
931                .map(|(col, source)| {
932                    let value = resolve_inject_value(col, source, ctx)?;
933                    Ok(inject_param_where_clause(col, value, &query_match.query_def.native_columns))
934                })
935                .collect::<Result<Vec<_>>>()?;
936
937            if let Some(rls) = rls_where_clause {
938                conditions.insert(0, rls.into_where_clause());
939            }
940            match conditions.len() {
941                0 => None,
942                1 => Some(conditions.remove(0)),
943                _ => Some(WhereClause::And(conditions)),
944            }
945        };
946
947        // 3b. Compose user-supplied WHERE when has_where is enabled (same as execute_from_match).
948        let combined_where: Option<WhereClause> = if query_match.query_def.auto_params.has_where {
949            let user_where = query_match
950                .arguments
951                .get("where")
952                .map(WhereClause::from_graphql_json)
953                .transpose()?;
954            match (combined_where, user_where) {
955                (None, None) => None,
956                (Some(sec), None) => Some(sec),
957                (None, Some(user)) => Some(user),
958                (Some(sec), Some(user)) => Some(WhereClause::And(vec![sec, user])),
959            }
960        } else {
961            combined_where
962        };
963
964        // 4. Execute COUNT query via adapter
965        let rows = self
966            .adapter
967            .execute_where_query_arc(sql_source, combined_where.as_ref(), None, None, None)
968            .await?;
969
970        // Return the row count
971        #[allow(clippy::cast_possible_truncation)] // Reason: row count fits u64
972        Ok(rows.len() as u64)
973    }
974
975    /// Execute a Relay connection query with cursor-based (keyset) pagination.
976    ///
977    /// Reads `first`, `after`, `last`, `before` from `variables`, fetches a page
978    /// of rows using `pk_{type}` keyset ordering, and wraps the result in the
979    /// Relay `XxxConnection` format:
980    /// ```json
981    /// {
982    ///   "data": {
983    ///     "users": {
984    ///       "edges": [{ "cursor": "NDI=", "node": { "id": "...", ... } }],
985    ///       "pageInfo": {
986    ///         "hasNextPage": true, "hasPreviousPage": false,
987    ///         "startCursor": "NDI=", "endCursor": "Mw=="
988    ///       }
989    ///     }
990    ///   }
991    /// }
992    /// ```
993    ///
994    /// # Errors
995    ///
996    /// Returns [`FraiseQLError::Validation`] if required pagination variables are
997    /// missing or contain invalid cursor values.
998    /// Returns [`FraiseQLError::Database`] if the SQL execution or result projection fails.
999    pub(super) async fn execute_relay_query(
1000        &self,
1001        query_match: &crate::runtime::matcher::QueryMatch,
1002        variables: Option<&serde_json::Value>,
1003        security_context: Option<&SecurityContext>,
1004    ) -> Result<serde_json::Value> {
1005        use crate::{
1006            compiler::aggregation::OrderByClause,
1007            runtime::relay::{decode_edge_cursor, decode_uuid_cursor, encode_edge_cursor},
1008            schema::CursorType,
1009        };
1010
1011        let query_def = &query_match.query_def;
1012
1013        // Guard: queries with inject params require a security context.
1014        if !query_def.inject_params.is_empty() && security_context.is_none() {
1015            return Err(FraiseQLError::Validation {
1016                message: format!(
1017                    "Query '{}' has inject params but was called without a security context",
1018                    query_def.name
1019                ),
1020                path:    None,
1021            });
1022        }
1023
1024        let sql_source =
1025            query_def.sql_source.as_deref().ok_or_else(|| FraiseQLError::Validation {
1026                message: format!("Relay query '{}' has no sql_source configured", query_def.name),
1027                path:    None,
1028            })?;
1029
1030        let cursor_column =
1031            query_def
1032                .relay_cursor_column
1033                .as_deref()
1034                .ok_or_else(|| FraiseQLError::Validation {
1035                    message: format!(
1036                        "Relay query '{}' has no relay_cursor_column derived",
1037                        query_def.name
1038                    ),
1039                    path:    None,
1040                })?;
1041
1042        // Guard: relay pagination requires the executor to have been constructed
1043        // via `Executor::new_with_relay` with a `RelayDatabaseAdapter`.
1044        let relay = self.relay.as_ref().ok_or_else(|| FraiseQLError::Validation {
1045            message: format!(
1046                "Relay pagination is not supported by the {} adapter. \
1047                 Use a relay-capable adapter (e.g. PostgreSQL) and construct \
1048                 the executor with `Executor::new_with_relay`.",
1049                self.adapter.database_type()
1050            ),
1051            path:    None,
1052        })?;
1053
1054        // --- RLS + inject_params evaluation (same logic as execute_from_match) ---
1055        // Evaluate RLS policy to generate security WHERE clause.
1056        let rls_where_clause: Option<RlsWhereClause> = if let (Some(ref rls_policy), Some(ctx)) =
1057            (&self.config.rls_policy, security_context)
1058        {
1059            rls_policy.evaluate(ctx, &query_def.name)?
1060        } else {
1061            None
1062        };
1063
1064        // Resolve inject_params from JWT claims and compose with RLS.
1065        let security_where: Option<WhereClause> = if query_def.inject_params.is_empty() {
1066            rls_where_clause.map(RlsWhereClause::into_where_clause)
1067        } else {
1068            let ctx = security_context.ok_or_else(|| FraiseQLError::Validation {
1069                message: format!(
1070                    "Query '{}' has inject params but was called without a security context",
1071                    query_def.name
1072                ),
1073                path:    None,
1074            })?;
1075            let mut conditions: Vec<WhereClause> = query_def
1076                .inject_params
1077                .iter()
1078                .map(|(col, source)| {
1079                    let value = resolve_inject_value(col, source, ctx)?;
1080                    Ok(inject_param_where_clause(col, value, &query_def.native_columns))
1081                })
1082                .collect::<Result<Vec<_>>>()?;
1083
1084            if let Some(rls) = rls_where_clause {
1085                conditions.insert(0, rls.into_where_clause());
1086            }
1087            match conditions.len() {
1088                0 => None,
1089                1 => Some(conditions.remove(0)),
1090                _ => Some(WhereClause::And(conditions)),
1091            }
1092        };
1093
1094        // Extract relay pagination arguments from variables.
1095        let vars = variables.and_then(|v| v.as_object());
1096        let first: Option<u32> = vars
1097            .and_then(|v| v.get("first"))
1098            .and_then(|v| v.as_u64())
1099            .map(|n| u32::try_from(n).unwrap_or(u32::MAX));
1100        let last: Option<u32> = vars
1101            .and_then(|v| v.get("last"))
1102            .and_then(|v| v.as_u64())
1103            .map(|n| u32::try_from(n).unwrap_or(u32::MAX));
1104        let after_cursor: Option<&str> = vars.and_then(|v| v.get("after")).and_then(|v| v.as_str());
1105        let before_cursor: Option<&str> =
1106            vars.and_then(|v| v.get("before")).and_then(|v| v.as_str());
1107
1108        // Decode base64 cursors — type depends on relay_cursor_type.
1109        // If a cursor string is provided but fails to decode, return a validation
1110        // error immediately. Silently ignoring an invalid cursor would return a
1111        // full result set, violating the client's pagination intent.
1112        let (after_pk, before_pk) =
1113            match query_def.relay_cursor_type {
1114                CursorType::Int64 => {
1115                    let after = match after_cursor {
1116                        Some(s) => Some(decode_edge_cursor(s).map(CursorValue::Int64).ok_or_else(
1117                            || FraiseQLError::Validation {
1118                                message: format!("invalid relay cursor for `after`: {s:?}"),
1119                                path:    Some("after".to_string()),
1120                            },
1121                        )?),
1122                        None => None,
1123                    };
1124                    let before = match before_cursor {
1125                        Some(s) => Some(decode_edge_cursor(s).map(CursorValue::Int64).ok_or_else(
1126                            || FraiseQLError::Validation {
1127                                message: format!("invalid relay cursor for `before`: {s:?}"),
1128                                path:    Some("before".to_string()),
1129                            },
1130                        )?),
1131                        None => None,
1132                    };
1133                    (after, before)
1134                },
1135                CursorType::Uuid => {
1136                    let after = match after_cursor {
1137                        Some(s) => {
1138                            Some(decode_uuid_cursor(s).map(CursorValue::Uuid).ok_or_else(|| {
1139                                FraiseQLError::Validation {
1140                                    message: format!("invalid relay cursor for `after`: {s:?}"),
1141                                    path:    Some("after".to_string()),
1142                                }
1143                            })?)
1144                        },
1145                        None => None,
1146                    };
1147                    let before = match before_cursor {
1148                        Some(s) => {
1149                            Some(decode_uuid_cursor(s).map(CursorValue::Uuid).ok_or_else(|| {
1150                                FraiseQLError::Validation {
1151                                    message: format!("invalid relay cursor for `before`: {s:?}"),
1152                                    path:    Some("before".to_string()),
1153                                }
1154                            })?)
1155                        },
1156                        None => None,
1157                    };
1158                    (after, before)
1159                },
1160            };
1161
1162        // Determine direction and limit.
1163        // Forward pagination takes priority; fallback to 20 if neither first/last given.
1164        let (forward, page_size) = if last.is_some() && first.is_none() {
1165            (false, last.unwrap_or(20))
1166        } else {
1167            (true, first.unwrap_or(20))
1168        };
1169
1170        // Fetch page_size + 1 rows to detect hasNextPage/hasPreviousPage.
1171        let fetch_limit = page_size + 1;
1172
1173        // Parse optional `where` filter from variables.
1174        let user_where_clause = if query_def.auto_params.has_where {
1175            vars.and_then(|v| v.get("where"))
1176                .map(WhereClause::from_graphql_json)
1177                .transpose()?
1178        } else {
1179            None
1180        };
1181
1182        // Compose final WHERE: security (RLS + inject) AND user-supplied WHERE.
1183        // Security conditions always come first so they cannot be bypassed.
1184        let combined_where = match (security_where, user_where_clause) {
1185            (None, None) => None,
1186            (Some(sec), None) => Some(sec),
1187            (None, Some(user)) => Some(user),
1188            (Some(sec), Some(user)) => Some(WhereClause::And(vec![sec, user])),
1189        };
1190
1191        // Parse optional `orderBy` from variables, enriched with schema type info.
1192        let order_by = if query_def.auto_params.has_order_by {
1193            vars.and_then(|v| v.get("orderBy"))
1194                .map(OrderByClause::from_graphql_json)
1195                .transpose()?
1196                .map(|clauses| {
1197                    enrich_order_by_clauses(
1198                        clauses,
1199                        &self.schema,
1200                        &query_def.return_type,
1201                        &query_def.native_columns,
1202                    )
1203                })
1204        } else {
1205            None
1206        };
1207
1208        // Detect whether the client selected `totalCount` inside the connection.
1209        // Named fragment spreads are already expanded by the matcher's FragmentResolver.
1210        // Inline fragments (`... on UserConnection { totalCount }`) remain as FieldSelection
1211        // entries with a name starting with "..." — we recurse one level into those.
1212        let include_total_count = query_match
1213            .selections
1214            .iter()
1215            .find(|sel| sel.name == query_def.name)
1216            .is_some_and(|connection_field| {
1217                selections_contain_field(&connection_field.nested_fields, "totalCount")
1218            });
1219
1220        // Capture before the move into execute_relay_page.
1221        let had_after = after_pk.is_some();
1222        let had_before = before_pk.is_some();
1223
1224        let result = relay
1225            .execute_relay_page(
1226                sql_source,
1227                cursor_column,
1228                after_pk,
1229                before_pk,
1230                fetch_limit,
1231                forward,
1232                combined_where.as_ref(),
1233                order_by.as_deref(),
1234                include_total_count,
1235            )
1236            .await?;
1237
1238        // Detect whether there are more pages.
1239        let has_extra = result.rows.len() > page_size as usize;
1240        let rows: Vec<_> = result.rows.into_iter().take(page_size as usize).collect();
1241
1242        let (has_next_page, has_previous_page) = if forward {
1243            (has_extra, had_after)
1244        } else {
1245            (had_before, has_extra)
1246        };
1247
1248        // Build edges: each edge has { cursor, node }.
1249        let mut edges = Vec::with_capacity(rows.len());
1250        let mut start_cursor_str: Option<String> = None;
1251        let mut end_cursor_str: Option<String> = None;
1252
1253        for (i, row) in rows.iter().enumerate() {
1254            let data = &row.data;
1255
1256            let col_val = data.as_object().and_then(|obj| obj.get(cursor_column));
1257
1258            let cursor_str = match query_def.relay_cursor_type {
1259                CursorType::Int64 => col_val
1260                    .and_then(|v| v.as_i64())
1261                    .map(encode_edge_cursor)
1262                    .ok_or_else(|| FraiseQLError::Validation {
1263                        message: format!(
1264                            "Relay query '{}': cursor column '{}' not found or not an integer in \
1265                             result JSONB. Ensure the view exposes this column inside the `data` object.",
1266                            query_def.name, cursor_column
1267                        ),
1268                        path: None,
1269                    })?,
1270                CursorType::Uuid => col_val
1271                    .and_then(|v| v.as_str())
1272                    .map(crate::runtime::relay::encode_uuid_cursor)
1273                    .ok_or_else(|| FraiseQLError::Validation {
1274                        message: format!(
1275                            "Relay query '{}': cursor column '{}' not found or not a string in \
1276                             result JSONB. Ensure the view exposes this column inside the `data` object.",
1277                            query_def.name, cursor_column
1278                        ),
1279                        path: None,
1280                    })?,
1281            };
1282
1283            if i == 0 {
1284                start_cursor_str = Some(cursor_str.clone());
1285            }
1286            end_cursor_str = Some(cursor_str.clone());
1287
1288            edges.push(serde_json::json!({
1289                "cursor": cursor_str,
1290                "node": data,
1291            }));
1292        }
1293
1294        let page_info = serde_json::json!({
1295            "hasNextPage": has_next_page,
1296            "hasPreviousPage": has_previous_page,
1297            "startCursor": start_cursor_str,
1298            "endCursor": end_cursor_str,
1299        });
1300
1301        let mut connection = serde_json::json!({
1302            "edges": edges,
1303            "pageInfo": page_info,
1304        });
1305
1306        // Include totalCount when the client requested it and the adapter provided it.
1307        if include_total_count {
1308            if let Some(count) = result.total_count {
1309                connection["totalCount"] = serde_json::json!(count);
1310            } else {
1311                connection["totalCount"] = serde_json::Value::Null;
1312            }
1313        }
1314
1315        let response = ResultProjector::wrap_in_data_envelope(connection, &query_def.name);
1316        Ok(response)
1317    }
1318
1319    /// Execute a Relay global `node(id: ID!)` query.
1320    ///
1321    /// Decodes the opaque node ID (`base64("TypeName:uuid")`), locates the
1322    /// appropriate SQL view by searching the compiled schema for a query that
1323    /// returns that type, and fetches the matching row.
1324    ///
1325    /// Returns `{ "data": { "node": <object> } }` on success, or
1326    /// `{ "data": { "node": null } }` when the object is not found.
1327    ///
1328    /// # Errors
1329    ///
1330    /// Returns `FraiseQLError::Validation` when:
1331    /// - The `id` argument is missing or malformed
1332    /// - No SQL view is registered for the requested type
1333    pub(super) async fn execute_node_query(
1334        &self,
1335        query: &str,
1336        variables: Option<&serde_json::Value>,
1337        selections: &[FieldSelection],
1338    ) -> Result<serde_json::Value> {
1339        use crate::{
1340            db::{WhereClause, where_clause::WhereOperator},
1341            runtime::relay::decode_node_id,
1342        };
1343
1344        // 1. Extract the raw opaque ID. Priority: $variables.id > inline literal in query text.
1345        let raw_id: String = if let Some(id_val) = variables
1346            .and_then(|v| v.as_object())
1347            .and_then(|obj| obj.get("id"))
1348            .and_then(|v| v.as_str())
1349        {
1350            id_val.to_string()
1351        } else {
1352            // Fall back to extracting inline literal, e.g. node(id: "NDI=")
1353            Self::extract_inline_node_id(query).ok_or_else(|| FraiseQLError::Validation {
1354                message: "node query: missing or unresolvable 'id' argument".to_string(),
1355                path:    Some("node.id".to_string()),
1356            })?
1357        };
1358
1359        // 2. Decode base64("TypeName:uuid") → (type_name, uuid).
1360        let (type_name, uuid) =
1361            decode_node_id(&raw_id).ok_or_else(|| FraiseQLError::Validation {
1362                message: format!("node query: invalid node ID '{raw_id}'"),
1363                path:    Some("node.id".to_string()),
1364            })?;
1365
1366        // 3. Find the SQL view for this type (O(1) index lookup built at startup).
1367        let sql_source: Arc<str> =
1368            self.node_type_index.get(&type_name).cloned().ok_or_else(|| {
1369                FraiseQLError::Validation {
1370                    message: format!("node query: no registered SQL view for type '{type_name}'"),
1371                    path:    Some("node.id".to_string()),
1372                }
1373            })?;
1374
1375        // 4. Build WHERE clause: data->>'id' = uuid
1376        let where_clause = WhereClause::Field {
1377            path:     vec!["id".to_string()],
1378            operator: WhereOperator::Eq,
1379            value:    serde_json::Value::String(uuid),
1380        };
1381
1382        // 5. Build projection hint from selections (mirrors regular query path).
1383        let projection_hint = if !selections.is_empty() {
1384            let typed_fields =
1385                build_typed_projection_fields(selections, &self.schema, &type_name, 0);
1386            let generator = PostgresProjectionGenerator::new();
1387            let projection_sql = generator
1388                .generate_typed_projection_sql(&typed_fields)
1389                .unwrap_or_else(|_| "data".to_string());
1390            Some(SqlProjectionHint {
1391                database:                    self.adapter.database_type(),
1392                projection_template:         projection_sql,
1393                estimated_reduction_percent: compute_projection_reduction(typed_fields.len()),
1394            })
1395        } else {
1396            None
1397        };
1398
1399        // 6. Execute the query (limit 1) with projection.
1400        let rows = self
1401            .adapter
1402            .execute_with_projection_arc(
1403                &sql_source,
1404                projection_hint.as_ref(),
1405                Some(&where_clause),
1406                Some(1),
1407                None,
1408                None,
1409            )
1410            .await?;
1411
1412        // 7. Return the first matching row (or null).
1413        // When the Arc is exclusively owned (uncached path, refcount = 1) we can move the
1414        // data out without copying.  When the cache also holds a reference (refcount ≥ 2)
1415        // we clone the single `serde_json::Value` for this one-row lookup.
1416        let node_value = Arc::try_unwrap(rows).map_or_else(
1417            |arc| arc.first().map_or(serde_json::Value::Null, |row| row.data.clone()),
1418            |v| v.into_iter().next().map_or(serde_json::Value::Null, |row| row.data),
1419        );
1420
1421        let response = ResultProjector::wrap_in_data_envelope(node_value, "node");
1422        Ok(response)
1423    }
1424}
1425
1426/// Estimate the payload reduction percentage from projecting N fields.
1427///
1428/// Uses a simple heuristic: each projected field saves proportional space
1429/// relative to a baseline of 20 typical JSONB fields per row. Clamped to
1430/// [10, 90] so the hint is never misleadingly extreme.
1431fn compute_projection_reduction(projected_field_count: usize) -> u32 {
1432    // Baseline: assume a typical type has 20 fields.
1433    const BASELINE_FIELD_COUNT: usize = 20;
1434    let requested = projected_field_count.min(BASELINE_FIELD_COUNT);
1435    let saved = BASELINE_FIELD_COUNT.saturating_sub(requested);
1436    // saved / BASELINE * 100, clamped to [10, 90]
1437    #[allow(clippy::cast_possible_truncation)] // Reason: result is in 0..=100, fits u32
1438    let percent = ((saved * 100) / BASELINE_FIELD_COUNT) as u32;
1439    percent.clamp(10, 90)
1440}
1441
1442/// Return `true` if `field_name` appears in `selections`, including inside inline
1443/// fragment entries (`FieldSelection` whose name starts with `"..."`).
1444///
1445/// Named fragment spreads are already flattened by [`FragmentResolver`] before this
1446/// is called, so we only need to recurse one level into inline fragments.
1447fn selections_contain_field(
1448    selections: &[crate::graphql::FieldSelection],
1449    field_name: &str,
1450) -> bool {
1451    for sel in selections {
1452        if sel.name == field_name {
1453            return true;
1454        }
1455        // Inline fragment: name starts with "..." (e.g. "...on UserConnection")
1456        if sel.name.starts_with("...") && selections_contain_field(&sel.nested_fields, field_name) {
1457            return true;
1458        }
1459    }
1460    false
1461}
1462
1463/// Auto-wired argument names that are handled by the `auto_params` system.
1464/// These are never treated as explicit WHERE filters.
1465const AUTO_PARAM_NAMES: &[&str] = &[
1466    "where", "limit", "offset", "orderBy", "first", "last", "after", "before",
1467];
1468
1469/// Build a `WhereClause` for a single inject param, respecting `native_columns`.
1470fn inject_param_where_clause(
1471    col: &str,
1472    value: serde_json::Value,
1473    native_columns: &std::collections::HashMap<String, String>,
1474) -> WhereClause {
1475    if let Some(pg_type) = native_columns.get(col) {
1476        WhereClause::NativeField {
1477            column: col.to_string(),
1478            pg_cast: pg_type_to_cast(pg_type).to_string(),
1479            operator: WhereOperator::Eq,
1480            value,
1481        }
1482    } else {
1483        WhereClause::Field {
1484            path: vec![col.to_string()],
1485            operator: WhereOperator::Eq,
1486            value,
1487        }
1488    }
1489}
1490
1491/// Convert PostgreSQL `information_schema.data_type` to a safe SQL cast suffix.
1492///
1493/// Returns an empty string for types that need no cast (e.g. `text`, `varchar`).
1494/// Normalise a database type name for use as the `pg_cast` hint in
1495/// `WhereClause::NativeField`.
1496///
1497/// The returned string is the **canonical PostgreSQL type name** (e.g. `"uuid"`,
1498/// `"int4"`, `"timestamp"`).  It is passed to `SqlDialect::cast_native_param`
1499/// which translates it into the dialect-appropriate cast expression:
1500/// - PostgreSQL: `$1::text::uuid`  (two-step to avoid binary wire-format mismatch)
1501/// - MySQL:      `CAST(? AS CHAR)`
1502/// - SQLite:     `CAST(? AS TEXT)`
1503/// - SQL Server: `CAST(@p1 AS UNIQUEIDENTIFIER)`
1504///
1505/// Returns `""` for text-like types that need no cast.
1506fn pg_type_to_cast(data_type: &str) -> &'static str {
1507    crate::runtime::native_columns::pg_type_to_cast(data_type)
1508}
1509
1510/// Convert explicit query arguments (e.g. `id`, `slug`, `email`) into
1511/// WHERE equality conditions and AND them onto `existing`.
1512///
1513/// Arguments whose names match auto-wired parameters (`where`, `limit`,
1514/// `offset`, `orderBy`, `first`, `last`, `after`, `before`) are skipped —
1515/// they are handled separately by the auto-params system.
1516///
1517/// When an argument has a matching entry in `native_columns`, a
1518/// `WhereClause::NativeField` is emitted (enabling B-tree index lookup via
1519/// `WHERE col = $N::type`).  Otherwise a `WhereClause::Field` is emitted
1520/// (JSONB extraction: `WHERE data->>'col' = $N`).
1521fn combine_explicit_arg_where(
1522    existing: Option<WhereClause>,
1523    defined_args: &[crate::schema::ArgumentDefinition],
1524    provided_args: &std::collections::HashMap<String, serde_json::Value>,
1525    native_columns: &std::collections::HashMap<String, String>,
1526) -> Option<WhereClause> {
1527    let explicit_conditions: Vec<WhereClause> = defined_args
1528        .iter()
1529        .filter(|arg| !AUTO_PARAM_NAMES.contains(&arg.name.as_str()))
1530        .filter_map(|arg| {
1531            provided_args.get(&arg.name).map(|value| {
1532                if let Some(pg_type) = native_columns.get(&arg.name) {
1533                    WhereClause::NativeField {
1534                        column:   arg.name.clone(),
1535                        pg_cast:  pg_type_to_cast(pg_type).to_string(),
1536                        operator: WhereOperator::Eq,
1537                        value:    value.clone(),
1538                    }
1539                } else {
1540                    WhereClause::Field {
1541                        path:     vec![arg.name.clone()],
1542                        operator: WhereOperator::Eq,
1543                        value:    value.clone(),
1544                    }
1545                }
1546            })
1547        })
1548        .collect();
1549
1550    if explicit_conditions.is_empty() {
1551        return existing;
1552    }
1553
1554    let mut all_conditions = Vec::new();
1555    if let Some(prev) = existing {
1556        all_conditions.push(prev);
1557    }
1558    all_conditions.extend(explicit_conditions);
1559
1560    match all_conditions.len() {
1561        1 => Some(all_conditions.remove(0)),
1562        _ => Some(WhereClause::And(all_conditions)),
1563    }
1564}
1565
1566#[cfg(test)]
1567mod tests {
1568    use super::*;
1569    use crate::graphql::FieldSelection;
1570
1571    // -------------------------------------------------------------------------
1572    // Helpers
1573    // -------------------------------------------------------------------------
1574
1575    fn leaf(name: &str) -> FieldSelection {
1576        FieldSelection {
1577            name:          name.to_string(),
1578            alias:         None,
1579            arguments:     vec![],
1580            nested_fields: vec![],
1581            directives:    vec![],
1582        }
1583    }
1584
1585    fn fragment(name: &str, nested: Vec<FieldSelection>) -> FieldSelection {
1586        FieldSelection {
1587            name:          name.to_string(),
1588            alias:         None,
1589            arguments:     vec![],
1590            nested_fields: nested,
1591            directives:    vec![],
1592        }
1593    }
1594
1595    // =========================================================================
1596    // compute_projection_reduction
1597    // =========================================================================
1598
1599    #[test]
1600    fn projection_reduction_zero_fields_is_clamped_to_90() {
1601        // 0 fields requested → saved = 20 → 100% → clamped to 90
1602        assert_eq!(compute_projection_reduction(0), 90);
1603    }
1604
1605    #[test]
1606    fn projection_reduction_all_fields_is_clamped_to_10() {
1607        // 20 fields (= baseline) → saved = 0 → 0% → clamped to 10
1608        assert_eq!(compute_projection_reduction(20), 10);
1609    }
1610
1611    #[test]
1612    fn projection_reduction_above_baseline_clamps_to_10() {
1613        // 50 fields > 20 baseline → same as 20 → clamped to 10
1614        assert_eq!(compute_projection_reduction(50), 10);
1615    }
1616
1617    #[test]
1618    fn projection_reduction_10_fields_is_50_percent() {
1619        // 10 requested → saved = 10 → 10/20 * 100 = 50 → within [10, 90]
1620        assert_eq!(compute_projection_reduction(10), 50);
1621    }
1622
1623    #[test]
1624    fn projection_reduction_1_field_is_high() {
1625        // 1 requested → saved = 19 → 95% → clamped to 90
1626        assert_eq!(compute_projection_reduction(1), 90);
1627    }
1628
1629    #[test]
1630    fn projection_reduction_result_always_in_clamp_range() {
1631        for n in 0_usize..=30 {
1632            let r = compute_projection_reduction(n);
1633            assert!((10..=90).contains(&r), "out of [10,90] for n={n}: got {r}");
1634        }
1635    }
1636
1637    // =========================================================================
1638    // selections_contain_field
1639    // =========================================================================
1640
1641    #[test]
1642    fn empty_selections_returns_false() {
1643        assert!(!selections_contain_field(&[], "totalCount"));
1644    }
1645
1646    #[test]
1647    fn direct_match_returns_true() {
1648        let sels = vec![leaf("edges"), leaf("totalCount"), leaf("pageInfo")];
1649        assert!(selections_contain_field(&sels, "totalCount"));
1650    }
1651
1652    #[test]
1653    fn absent_field_returns_false() {
1654        let sels = vec![leaf("edges"), leaf("pageInfo")];
1655        assert!(!selections_contain_field(&sels, "totalCount"));
1656    }
1657
1658    #[test]
1659    fn inline_fragment_nested_match_returns_true() {
1660        // "...on UserConnection" wrapping totalCount
1661        let inline = fragment("...on UserConnection", vec![leaf("totalCount"), leaf("edges")]);
1662        let sels = vec![inline];
1663        assert!(selections_contain_field(&sels, "totalCount"));
1664    }
1665
1666    #[test]
1667    fn inline_fragment_does_not_spuriously_match_fragment_name() {
1668        // The fragment entry (name "...on Foo") only matches a field named exactly "...on Foo"
1669        // when searched directly; it should NOT match an unrelated field name.
1670        let inline = fragment("...on Foo", vec![leaf("id")]);
1671        let sels = vec![inline];
1672        assert!(!selections_contain_field(&sels, "totalCount"));
1673        // "id" is nested inside the fragment and should be found via recursion
1674        assert!(selections_contain_field(&sels, "id"));
1675    }
1676
1677    #[test]
1678    fn field_not_in_fragment_returns_false() {
1679        let inline = fragment("...on UserConnection", vec![leaf("edges"), leaf("pageInfo")]);
1680        let sels = vec![inline];
1681        assert!(!selections_contain_field(&sels, "totalCount"));
1682    }
1683
1684    #[test]
1685    fn non_fragment_nested_field_not_searched() {
1686        // Only entries whose name starts with "..." trigger recursion.
1687        // A plain field's nested_fields should NOT be recursed into.
1688        let nested_count = fragment("edges", vec![leaf("totalCount")]);
1689        let sels = vec![nested_count];
1690        // "edges" doesn't start with "..." — nested fields not searched
1691        assert!(!selections_contain_field(&sels, "totalCount"));
1692    }
1693
1694    #[test]
1695    fn multiple_fragments_any_can_match() {
1696        let frag1 = fragment("...on TypeA", vec![leaf("id")]);
1697        let frag2 = fragment("...on TypeB", vec![leaf("totalCount")]);
1698        let sels = vec![frag1, frag2];
1699        assert!(selections_contain_field(&sels, "totalCount"));
1700        assert!(selections_contain_field(&sels, "id"));
1701        assert!(!selections_contain_field(&sels, "name"));
1702    }
1703
1704    #[test]
1705    fn mixed_direct_and_fragment_selections() {
1706        let inline = fragment("...on Connection", vec![leaf("pageInfo")]);
1707        let sels = vec![leaf("edges"), inline, leaf("metadata")];
1708        assert!(selections_contain_field(&sels, "edges"));
1709        assert!(selections_contain_field(&sels, "pageInfo"));
1710        assert!(selections_contain_field(&sels, "metadata"));
1711        assert!(!selections_contain_field(&sels, "cursor"));
1712    }
1713
1714    // =========================================================================
1715    // combine_explicit_arg_where
1716    // =========================================================================
1717
1718    use crate::schema::{ArgumentDefinition, FieldType};
1719
1720    fn make_arg(name: &str) -> ArgumentDefinition {
1721        ArgumentDefinition::new(name, FieldType::Id)
1722    }
1723
1724    #[test]
1725    fn no_explicit_args_returns_existing() {
1726        let existing = Some(WhereClause::Field {
1727            path:     vec!["rls".into()],
1728            operator: WhereOperator::Eq,
1729            value:    serde_json::json!("x"),
1730        });
1731        let result = combine_explicit_arg_where(
1732            existing.clone(),
1733            &[],
1734            &std::collections::HashMap::new(),
1735            &std::collections::HashMap::new(),
1736        );
1737        assert_eq!(result, existing);
1738    }
1739
1740    #[test]
1741    fn explicit_id_arg_produces_where_clause() {
1742        let args = vec![make_arg("id")];
1743        let mut provided = std::collections::HashMap::new();
1744        provided.insert("id".into(), serde_json::json!("uuid-123"));
1745
1746        let result =
1747            combine_explicit_arg_where(None, &args, &provided, &std::collections::HashMap::new());
1748        assert!(result.is_some(), "explicit id arg should produce a WHERE clause");
1749        match result.expect("just asserted Some") {
1750            WhereClause::Field {
1751                path,
1752                operator,
1753                value,
1754            } => {
1755                assert_eq!(path, vec!["id".to_string()]);
1756                assert_eq!(operator, WhereOperator::Eq);
1757                assert_eq!(value, serde_json::json!("uuid-123"));
1758            },
1759            other => panic!("expected Field, got {other:?}"),
1760        }
1761    }
1762
1763    #[test]
1764    fn auto_param_names_are_skipped() {
1765        let args = vec![
1766            make_arg("where"),
1767            make_arg("limit"),
1768            make_arg("offset"),
1769            make_arg("orderBy"),
1770            make_arg("first"),
1771            make_arg("last"),
1772            make_arg("after"),
1773            make_arg("before"),
1774            make_arg("id"),
1775        ];
1776        let mut provided = std::collections::HashMap::new();
1777        for name in &[
1778            "where", "limit", "offset", "orderBy", "first", "last", "after", "before", "id",
1779        ] {
1780            provided.insert((*name).to_string(), serde_json::json!("value"));
1781        }
1782
1783        let result =
1784            combine_explicit_arg_where(None, &args, &provided, &std::collections::HashMap::new());
1785        // Only "id" should produce a WHERE — all auto-param names are skipped
1786        match result.expect("id arg should produce WHERE") {
1787            WhereClause::Field { path, .. } => {
1788                assert_eq!(path, vec!["id".to_string()]);
1789            },
1790            other => panic!("expected single Field for 'id', got {other:?}"),
1791        }
1792    }
1793
1794    #[test]
1795    fn explicit_args_combined_with_existing_where() {
1796        let existing = WhereClause::Field {
1797            path:     vec!["rls_tenant".into()],
1798            operator: WhereOperator::Eq,
1799            value:    serde_json::json!("tenant-1"),
1800        };
1801        let args = vec![make_arg("id")];
1802        let mut provided = std::collections::HashMap::new();
1803        provided.insert("id".into(), serde_json::json!("uuid-456"));
1804
1805        let result = combine_explicit_arg_where(
1806            Some(existing),
1807            &args,
1808            &provided,
1809            &std::collections::HashMap::new(),
1810        );
1811        match result.expect("should produce combined WHERE") {
1812            WhereClause::And(conditions) => {
1813                assert_eq!(conditions.len(), 2, "should AND existing + explicit");
1814            },
1815            other => panic!("expected And, got {other:?}"),
1816        }
1817    }
1818
1819    #[test]
1820    fn unprovided_explicit_arg_is_ignored() {
1821        let args = vec![make_arg("id"), make_arg("slug")];
1822        let mut provided = std::collections::HashMap::new();
1823        // Only provide "id", not "slug"
1824        provided.insert("id".into(), serde_json::json!("uuid-789"));
1825
1826        let result =
1827            combine_explicit_arg_where(None, &args, &provided, &std::collections::HashMap::new());
1828        match result.expect("id arg should produce WHERE") {
1829            WhereClause::Field { path, .. } => {
1830                assert_eq!(path, vec!["id".to_string()]);
1831            },
1832            other => panic!("expected single Field for 'id', got {other:?}"),
1833        }
1834    }
1835
1836    // =========================================================================
1837    // pg_type_to_cast — returns canonical type names passed to SqlDialect::cast_native_param
1838    // =========================================================================
1839
1840    #[test]
1841    fn uuid_normalises_to_canonical_type_name() {
1842        assert_eq!(pg_type_to_cast("uuid"), "uuid");
1843        assert_eq!(pg_type_to_cast("UUID"), "uuid");
1844    }
1845
1846    #[test]
1847    fn integer_types_normalise_to_canonical_names() {
1848        assert_eq!(pg_type_to_cast("integer"), "int4");
1849        assert_eq!(pg_type_to_cast("int4"), "int4");
1850        assert_eq!(pg_type_to_cast("bigint"), "int8");
1851        assert_eq!(pg_type_to_cast("int8"), "int8");
1852        assert_eq!(pg_type_to_cast("smallint"), "int2");
1853        assert_eq!(pg_type_to_cast("int2"), "int2");
1854    }
1855
1856    #[test]
1857    fn float_and_numeric_types_normalise_to_canonical_names() {
1858        assert_eq!(pg_type_to_cast("numeric"), "numeric");
1859        assert_eq!(pg_type_to_cast("decimal"), "numeric");
1860        assert_eq!(pg_type_to_cast("double precision"), "float8");
1861        assert_eq!(pg_type_to_cast("float8"), "float8");
1862        assert_eq!(pg_type_to_cast("real"), "float4");
1863        assert_eq!(pg_type_to_cast("float4"), "float4");
1864    }
1865
1866    #[test]
1867    fn date_and_time_types_normalise_to_canonical_names() {
1868        assert_eq!(pg_type_to_cast("timestamp"), "timestamp");
1869        assert_eq!(pg_type_to_cast("timestamp without time zone"), "timestamp");
1870        assert_eq!(pg_type_to_cast("timestamptz"), "timestamptz");
1871        assert_eq!(pg_type_to_cast("timestamp with time zone"), "timestamptz");
1872        assert_eq!(pg_type_to_cast("date"), "date");
1873        assert_eq!(pg_type_to_cast("time"), "time");
1874        assert_eq!(pg_type_to_cast("time without time zone"), "time");
1875    }
1876
1877    #[test]
1878    fn bool_normalises_to_canonical_name() {
1879        assert_eq!(pg_type_to_cast("boolean"), "bool");
1880        assert_eq!(pg_type_to_cast("bool"), "bool");
1881    }
1882
1883    #[test]
1884    fn text_types_produce_empty_hint_meaning_no_cast() {
1885        assert_eq!(pg_type_to_cast("text"), "");
1886        assert_eq!(pg_type_to_cast("varchar"), "");
1887        assert_eq!(pg_type_to_cast("unknown_type"), "");
1888    }
1889}