Skip to main content

fraiseql_core/runtime/executor/
query.rs

1//! Regular and relay query execution.
2
3use std::sync::Arc;
4
5use super::{Executor, null_masked_fields, resolve_inject_value};
6use crate::{
7    db::{
8        CursorValue, ProjectionField, WhereClause, WhereOperator,
9        projection_generator::PostgresProjectionGenerator, traits::DatabaseAdapter,
10    },
11    error::{FraiseQLError, Result},
12    runtime::{JsonbStrategy, ResultProjector},
13    schema::SqlProjectionHint,
14    security::{RlsWhereClause, SecurityContext},
15};
16
17impl<A: DatabaseAdapter> Executor<A> {
18    /// Execute a regular query with row-level security (RLS) filtering.
19    ///
20    /// This method:
21    /// 1. Validates the user's security context (token expiration, etc.)
22    /// 2. Evaluates RLS policies to determine what rows the user can access
23    /// 3. Composes RLS filters with user-provided WHERE clauses
24    /// 4. Passes the composed filter to the database adapter for SQL-level filtering
25    ///
26    /// RLS filtering happens at the database level, not in Rust, ensuring:
27    /// - High performance (database can optimize filters)
28    /// - Correct handling of pagination (LIMIT applied after RLS filtering)
29    /// - Type-safe composition via `WhereClause` enum
30    ///
31    /// # Errors
32    ///
33    /// * [`FraiseQLError::Validation`] — security token expired, role check failed, or query not
34    ///   found in schema.
35    /// * [`FraiseQLError::Database`] — the database adapter returned an error.
36    pub(super) async fn execute_regular_query_with_security(
37        &self,
38        query: &str,
39        variables: Option<&serde_json::Value>,
40        security_context: &SecurityContext,
41    ) -> Result<String> {
42        // 1. Validate security context (check expiration, etc.)
43        if security_context.is_expired() {
44            return Err(FraiseQLError::Validation {
45                message: "Security token has expired".to_string(),
46                path:    Some("request.authorization".to_string()),
47            });
48        }
49
50        // 2. Match query to compiled template
51        let query_match = self.matcher.match_query(query, variables)?;
52
53        // 2b. Enforce requires_role — return "not found" (not "forbidden") to prevent enumeration
54        if let Some(ref required_role) = query_match.query_def.requires_role {
55            if !security_context.roles.iter().any(|r| r == required_role) {
56                return Err(FraiseQLError::Validation {
57                    message: format!("Query '{}' not found in schema", query_match.query_def.name),
58                    path:    None,
59                });
60            }
61        }
62
63        // Route relay queries to dedicated handler with security context.
64        if query_match.query_def.relay {
65            return self.execute_relay_query(&query_match, variables, Some(security_context)).await;
66        }
67
68        // 3. Create execution plan
69        let plan = self.planner.plan(&query_match)?;
70
71        // 4. Evaluate RLS policy and build WHERE clause filter. The return type is
72        //    Option<RlsWhereClause> — a compile-time proof that the clause passed through RLS
73        //    evaluation.
74        let rls_where_clause: Option<RlsWhereClause> =
75            if let Some(ref rls_policy) = self.config.rls_policy {
76                // Evaluate RLS policy with user's security context
77                rls_policy.evaluate(security_context, &query_match.query_def.name)?
78            } else {
79                // No RLS policy configured, allow all access
80                None
81            };
82
83        // 5. Get SQL source from query definition
84        let sql_source =
85            query_match
86                .query_def
87                .sql_source
88                .as_ref()
89                .ok_or_else(|| FraiseQLError::Validation {
90                    message: "Query has no SQL source".to_string(),
91                    path:    None,
92                })?;
93
94        // 6. Generate SQL projection hint for requested fields (optimization) Look up field types
95        //    from schema to use the correct JSONB operator: `->` for objects/lists (preserves
96        //    JSONB), `->>` for scalars (text).
97        let projection_hint = if !plan.projection_fields.is_empty()
98            && plan.jsonb_strategy == JsonbStrategy::Project
99        {
100            let type_def = self.schema.find_type(&query_match.query_def.return_type);
101            let typed_fields: Vec<ProjectionField> = plan
102                .projection_fields
103                .iter()
104                .map(|name| {
105                    let is_composite = type_def
106                        .and_then(|td| td.fields.iter().find(|f| f.name == name.as_str()))
107                        .is_some_and(|f| !f.field_type.is_scalar());
108                    ProjectionField {
109                        name: name.clone(),
110                        is_composite,
111                    }
112                })
113                .collect();
114
115            let generator = PostgresProjectionGenerator::new();
116            let projection_sql = generator
117                .generate_typed_projection_sql(&typed_fields)
118                .unwrap_or_else(|_| "data".to_string());
119
120            Some(SqlProjectionHint {
121                database:                    self.adapter.database_type(),
122                projection_template:         projection_sql,
123                estimated_reduction_percent: compute_projection_reduction(
124                    plan.projection_fields.len(),
125                ),
126            })
127        } else {
128            // Stream strategy: return full JSONB, no projection hint
129            None
130        };
131
132        // 7. AND inject conditions onto the RLS WHERE clause. Inject conditions always come after
133        //    RLS so they cannot bypass it.
134        let combined_where: Option<WhereClause> = if query_match.query_def.inject_params.is_empty()
135        {
136            // Common path: unwrap RlsWhereClause into WhereClause for the adapter
137            rls_where_clause.map(RlsWhereClause::into_where_clause)
138        } else {
139            let mut conditions: Vec<WhereClause> = query_match
140                .query_def
141                .inject_params
142                .iter()
143                .map(|(col, source)| {
144                    let value = resolve_inject_value(col, source, security_context)?;
145                    Ok(WhereClause::Field {
146                        path: vec![col.clone()],
147                        operator: WhereOperator::Eq,
148                        value,
149                    })
150                })
151                .collect::<Result<Vec<_>>>()?;
152
153            if let Some(rls) = rls_where_clause {
154                conditions.insert(0, rls.into_where_clause());
155            }
156            match conditions.len() {
157                0 => None,
158                1 => Some(conditions.remove(0)),
159                _ => Some(WhereClause::And(conditions)),
160            }
161        };
162
163        // 5b. Compose user-supplied WHERE from GraphQL arguments when has_where is enabled.
164        //     Security conditions (RLS + inject) are always first so they cannot be bypassed.
165        let combined_where: Option<WhereClause> = if query_match.query_def.auto_params.has_where {
166            let user_where = query_match
167                .arguments
168                .get("where")
169                .map(WhereClause::from_graphql_json)
170                .transpose()?;
171            match (combined_where, user_where) {
172                (None, None) => None,
173                (Some(sec), None) => Some(sec),
174                (None, Some(user)) => Some(user),
175                (Some(sec), Some(user)) => Some(WhereClause::And(vec![sec, user])),
176            }
177        } else {
178            combined_where
179        };
180
181        // 8. Extract limit/offset from query arguments when auto_params are enabled
182        let limit = if query_match.query_def.auto_params.has_limit {
183            query_match
184                .arguments
185                .get("limit")
186                .and_then(|v| v.as_u64())
187                .and_then(|v| u32::try_from(v).ok())
188        } else {
189            None
190        };
191
192        let offset = if query_match.query_def.auto_params.has_offset {
193            query_match
194                .arguments
195                .get("offset")
196                .and_then(|v| v.as_u64())
197                .and_then(|v| u32::try_from(v).ok())
198        } else {
199            None
200        };
201
202        // 8b. Extract order_by from query arguments when has_order_by is enabled
203        let order_by_clauses = if query_match.query_def.auto_params.has_order_by {
204            query_match
205                .arguments
206                .get("orderBy")
207                .map(crate::db::OrderByClause::from_graphql_json)
208                .transpose()?
209        } else {
210            None
211        };
212
213        // 9. Execute query with combined WHERE clause filter
214        let results = self
215            .adapter
216            .execute_with_projection(
217                sql_source,
218                projection_hint.as_ref(),
219                combined_where.as_ref(),
220                limit,
221                offset,
222                order_by_clauses.as_deref(),
223            )
224            .await?;
225
226        // 10. Apply field-level RBAC filtering (reject / mask / allow)
227        let access = self.apply_field_rbac_filtering(
228            &query_match.query_def.return_type,
229            plan.projection_fields,
230            security_context,
231        )?;
232
233        // 11. Project results — include both allowed and masked fields in projection
234        let mut all_projection_fields = access.allowed;
235        all_projection_fields.extend(access.masked.iter().cloned());
236        let projector = ResultProjector::new(all_projection_fields);
237        let mut projected =
238            projector.project_results(&results, query_match.query_def.returns_list)?;
239
240        // 11. Null out masked fields in the projected result
241        if !access.masked.is_empty() {
242            null_masked_fields(&mut projected, &access.masked);
243        }
244
245        // 12. Wrap in GraphQL data envelope
246        let response =
247            ResultProjector::wrap_in_data_envelope(projected, &query_match.query_def.name);
248
249        // 13. Serialize to JSON string
250        Ok(serde_json::to_string(&response)?)
251    }
252
253    /// Execute a regular (non-aggregate, non-relay) GraphQL query.
254    ///
255    /// # Errors
256    ///
257    /// Returns [`FraiseQLError::Validation`] if the query does not match a compiled
258    /// template or requires a security context that is not present.
259    /// Returns [`FraiseQLError::Database`] if the SQL execution or result projection fails.
260    pub(super) async fn execute_regular_query(
261        &self,
262        query: &str,
263        variables: Option<&serde_json::Value>,
264    ) -> Result<String> {
265        // 1. Match query to compiled template
266        let query_match = self.matcher.match_query(query, variables)?;
267
268        // Guard: role-restricted queries are invisible to unauthenticated users
269        if query_match.query_def.requires_role.is_some() {
270            return Err(FraiseQLError::Validation {
271                message: format!("Query '{}' not found in schema", query_match.query_def.name),
272                path:    None,
273            });
274        }
275
276        // Guard: queries with inject params require a security context.
277        if !query_match.query_def.inject_params.is_empty() {
278            return Err(FraiseQLError::Validation {
279                message: format!(
280                    "Query '{}' has inject params but was called without a security context",
281                    query_match.query_def.name
282                ),
283                path:    None,
284            });
285        }
286
287        // Route relay queries to dedicated handler.
288        if query_match.query_def.relay {
289            return self.execute_relay_query(&query_match, variables, None).await;
290        }
291
292        // 2. Create execution plan
293        let plan = self.planner.plan(&query_match)?;
294
295        // 3. Execute SQL query
296        let sql_source = query_match.query_def.sql_source.as_ref().ok_or_else(|| {
297            crate::error::FraiseQLError::Validation {
298                message: "Query has no SQL source".to_string(),
299                path:    None,
300            }
301        })?;
302
303        // 3a. Generate SQL projection hint for requested fields (optimization)
304        // Strategy selection: Project (extract fields) vs Stream (return full JSONB)
305        // This reduces payload by projecting only requested fields at the database level
306        let projection_hint = if !plan.projection_fields.is_empty()
307            && plan.jsonb_strategy == JsonbStrategy::Project
308        {
309            let generator = PostgresProjectionGenerator::new();
310            let projection_sql = generator
311                .generate_projection_sql(&plan.projection_fields)
312                .unwrap_or_else(|_| "data".to_string());
313
314            Some(SqlProjectionHint {
315                database:                    self.adapter.database_type(),
316                projection_template:         projection_sql,
317                estimated_reduction_percent: compute_projection_reduction(
318                    plan.projection_fields.len(),
319                ),
320            })
321        } else {
322            // Stream strategy: return full JSONB, no projection hint
323            None
324        };
325
326        // 3b. Extract auto_params (limit, offset, where, order_by) from arguments
327        let user_where: Option<WhereClause> = if query_match.query_def.auto_params.has_where {
328            query_match
329                .arguments
330                .get("where")
331                .map(WhereClause::from_graphql_json)
332                .transpose()?
333        } else {
334            None
335        };
336
337        let limit = if query_match.query_def.auto_params.has_limit {
338            query_match
339                .arguments
340                .get("limit")
341                .and_then(|v| v.as_u64())
342                .and_then(|v| u32::try_from(v).ok())
343        } else {
344            None
345        };
346
347        let offset = if query_match.query_def.auto_params.has_offset {
348            query_match
349                .arguments
350                .get("offset")
351                .and_then(|v| v.as_u64())
352                .and_then(|v| u32::try_from(v).ok())
353        } else {
354            None
355        };
356
357        let order_by_clauses = if query_match.query_def.auto_params.has_order_by {
358            query_match
359                .arguments
360                .get("orderBy")
361                .map(crate::db::OrderByClause::from_graphql_json)
362                .transpose()?
363        } else {
364            None
365        };
366
367        let results = self
368            .adapter
369            .execute_with_projection(
370                sql_source,
371                projection_hint.as_ref(),
372                user_where.as_ref(),
373                limit,
374                offset,
375                order_by_clauses.as_deref(),
376            )
377            .await?;
378
379        // 4. Project results
380        let projector = ResultProjector::new(plan.projection_fields);
381        let projected = projector.project_results(&results, query_match.query_def.returns_list)?;
382
383        // 5. Wrap in GraphQL data envelope
384        let response =
385            ResultProjector::wrap_in_data_envelope(projected, &query_match.query_def.name);
386
387        // 6. Serialize to JSON string
388        Ok(serde_json::to_string(&response)?)
389    }
390
391    /// Execute a pre-built `QueryMatch` directly, bypassing GraphQL string parsing.
392    ///
393    /// Used by the REST transport for embedded sub-queries and NDJSON streaming
394    /// where the query parameters are already resolved from HTTP request parameters.
395    ///
396    /// # Errors
397    ///
398    /// Returns `FraiseQLError::Validation` if the query has no SQL source.
399    /// Returns `FraiseQLError::Database` if the adapter returns an error.
400    pub async fn execute_query_direct(
401        &self,
402        query_match: &crate::runtime::matcher::QueryMatch,
403        _variables: Option<&serde_json::Value>,
404        security_context: Option<&SecurityContext>,
405    ) -> Result<String> {
406        // Evaluate RLS policy if present.
407        let rls_where_clause: Option<RlsWhereClause> = if let (Some(ref rls_policy), Some(ctx)) =
408            (&self.config.rls_policy, security_context)
409        {
410            rls_policy.evaluate(ctx, &query_match.query_def.name)?
411        } else {
412            None
413        };
414
415        // Get SQL source.
416        let sql_source =
417            query_match
418                .query_def
419                .sql_source
420                .as_ref()
421                .ok_or_else(|| FraiseQLError::Validation {
422                    message: "Query has no SQL source".to_string(),
423                    path:    None,
424                })?;
425
426        // Build execution plan.
427        let plan = self.planner.plan(query_match)?;
428
429        // Extract auto_params from arguments.
430        let user_where: Option<WhereClause> = if query_match.query_def.auto_params.has_where {
431            query_match
432                .arguments
433                .get("where")
434                .map(WhereClause::from_graphql_json)
435                .transpose()?
436        } else {
437            None
438        };
439
440        let limit = query_match
441            .arguments
442            .get("limit")
443            .and_then(|v| v.as_u64())
444            .and_then(|v| u32::try_from(v).ok());
445
446        let offset = query_match
447            .arguments
448            .get("offset")
449            .and_then(|v| v.as_u64())
450            .and_then(|v| u32::try_from(v).ok());
451
452        let order_by_clauses = query_match
453            .arguments
454            .get("orderBy")
455            .map(crate::db::OrderByClause::from_graphql_json)
456            .transpose()?;
457
458        // Compose RLS and user WHERE clauses.
459        let composed_where = match (&rls_where_clause, &user_where) {
460            (Some(rls), Some(user)) => {
461                Some(WhereClause::And(vec![rls.as_where_clause().clone(), user.clone()]))
462            },
463            (Some(rls), None) => Some(rls.as_where_clause().clone()),
464            (None, Some(user)) => Some(user.clone()),
465            (None, None) => None,
466        };
467
468        // Inject security-derived params.
469        if !query_match.query_def.inject_params.is_empty() {
470            if let Some(ctx) = security_context {
471                for (param_name, source) in &query_match.query_def.inject_params {
472                    let _value = resolve_inject_value(param_name, source, ctx)?;
473                    // Injected params are applied at the SQL level via WHERE clauses,
474                    // not via GraphQL variables, so no mutation of variables is needed here.
475                }
476            }
477        }
478
479        // Execute.
480        let results = self
481            .adapter
482            .execute_with_projection(
483                sql_source,
484                None,
485                composed_where.as_ref(),
486                limit,
487                offset,
488                order_by_clauses.as_deref(),
489            )
490            .await?;
491
492        // Project results.
493        let projector = ResultProjector::new(plan.projection_fields);
494        let projected = projector.project_results(&results, query_match.query_def.returns_list)?;
495
496        // Wrap in GraphQL data envelope.
497        let response =
498            ResultProjector::wrap_in_data_envelope(projected, &query_match.query_def.name);
499
500        Ok(serde_json::to_string(&response)?)
501    }
502
503    /// Execute a mutation with security context for REST transport.
504    ///
505    /// Delegates to the standard mutation execution path with RLS enforcement.
506    ///
507    /// # Errors
508    ///
509    /// Returns `FraiseQLError::Database` if the adapter returns an error.
510    /// Returns `FraiseQLError::Validation` if inject params require a missing security context.
511    pub async fn execute_mutation_with_security(
512        &self,
513        mutation_name: &str,
514        arguments: &serde_json::Value,
515        security_context: Option<&crate::security::SecurityContext>,
516    ) -> crate::error::Result<String> {
517        // Build a synthetic GraphQL mutation query and delegate to execute()
518        let args_str = if let Some(obj) = arguments.as_object() {
519            obj.iter().map(|(k, v)| format!("{k}: {v}")).collect::<Vec<_>>().join(", ")
520        } else {
521            String::new()
522        };
523        let query = if args_str.is_empty() {
524            format!("mutation {{ {mutation_name} {{ status entity_id message }} }}")
525        } else {
526            format!("mutation {{ {mutation_name}({args_str}) {{ status entity_id message }} }}")
527        };
528
529        if let Some(ctx) = security_context {
530            self.execute_with_security(&query, None, ctx).await
531        } else {
532            self.execute(&query, None).await
533        }
534    }
535
536    /// Execute a batch of mutations (for REST bulk insert).
537    ///
538    /// Executes each mutation individually and collects results into a `BulkResult`.
539    ///
540    /// # Errors
541    ///
542    /// Returns the first error encountered during batch execution.
543    pub async fn execute_mutation_batch(
544        &self,
545        mutation_name: &str,
546        items: &[serde_json::Value],
547        security_context: Option<&crate::security::SecurityContext>,
548    ) -> crate::error::Result<crate::runtime::BulkResult> {
549        let mut entities = Vec::with_capacity(items.len());
550        for item in items {
551            let result = self
552                .execute_mutation_with_security(mutation_name, item, security_context)
553                .await?;
554            entities.push(serde_json::Value::String(result));
555        }
556        Ok(crate::runtime::BulkResult {
557            affected_rows: entities.len() as u64,
558            entities:      Some(entities),
559        })
560    }
561
562    /// Execute a bulk operation (collection-level PATCH/DELETE) by filter.
563    ///
564    /// # Errors
565    ///
566    /// Returns `FraiseQLError::Database` if the adapter returns an error.
567    pub async fn execute_bulk_by_filter(
568        &self,
569        query_match: &crate::runtime::matcher::QueryMatch,
570        mutation_name: &str,
571        body: Option<&serde_json::Value>,
572        _id_field: &str,
573        _max_affected: u64,
574        security_context: Option<&SecurityContext>,
575    ) -> crate::error::Result<crate::runtime::BulkResult> {
576        // Execute the filter query to find matching rows.
577        let result_str = self.execute_query_direct(query_match, None, security_context).await?;
578
579        let args = body.cloned().unwrap_or(serde_json::json!({}));
580        let result = self
581            .execute_mutation_with_security(mutation_name, &args, security_context)
582            .await?;
583
584        let parsed: serde_json::Value =
585            serde_json::from_str(&result_str).unwrap_or(serde_json::json!({}));
586        let count = parsed
587            .get("data")
588            .and_then(|d| d.as_object())
589            .and_then(|o| o.values().next())
590            .and_then(|v| v.as_array())
591            .map_or(1, |a| a.len() as u64);
592
593        Ok(crate::runtime::BulkResult {
594            affected_rows: count,
595            entities:      Some(vec![serde_json::Value::String(result)]),
596        })
597    }
598
599    /// Count the total number of rows matching the query's WHERE and RLS conditions.
600    ///
601    /// Issues a `SELECT COUNT(*) FROM {view} WHERE {conditions}` query, ignoring
602    /// pagination (ORDER BY, LIMIT, OFFSET). Useful for REST `X-Total-Count` headers
603    /// and `count=exact` query parameter support.
604    ///
605    /// # Arguments
606    ///
607    /// * `query_match` - Pre-built query match identifying the SQL source and filters
608    /// * `variables` - Optional variables (unused for count, reserved for future use)
609    /// * `security_context` - Optional authenticated user context for RLS and inject
610    ///
611    /// # Errors
612    ///
613    /// Returns `FraiseQLError::Validation` if the query has no SQL source, or if
614    /// inject params are required but no security context is provided.
615    /// Returns `FraiseQLError::Database` if the adapter returns an error.
616    pub async fn count_rows(
617        &self,
618        query_match: &crate::runtime::matcher::QueryMatch,
619        _variables: Option<&serde_json::Value>,
620        security_context: Option<&SecurityContext>,
621    ) -> Result<u64> {
622        // 1. Evaluate RLS policy
623        let rls_where_clause: Option<RlsWhereClause> = if let (Some(ref rls_policy), Some(ctx)) =
624            (&self.config.rls_policy, security_context)
625        {
626            rls_policy.evaluate(ctx, &query_match.query_def.name)?
627        } else {
628            None
629        };
630
631        // 2. Get SQL source
632        let sql_source =
633            query_match
634                .query_def
635                .sql_source
636                .as_ref()
637                .ok_or_else(|| FraiseQLError::Validation {
638                    message: "Query has no SQL source".to_string(),
639                    path:    None,
640                })?;
641
642        // 3. Build combined WHERE clause (RLS + inject)
643        let combined_where: Option<WhereClause> = if query_match.query_def.inject_params.is_empty()
644        {
645            rls_where_clause.map(RlsWhereClause::into_where_clause)
646        } else {
647            let ctx = security_context.ok_or_else(|| FraiseQLError::Validation {
648                message: format!(
649                    "Query '{}' has inject params but no security context is available",
650                    query_match.query_def.name
651                ),
652                path:    None,
653            })?;
654            let mut conditions: Vec<WhereClause> = query_match
655                .query_def
656                .inject_params
657                .iter()
658                .map(|(col, source)| {
659                    let value = resolve_inject_value(col, source, ctx)?;
660                    Ok(WhereClause::Field {
661                        path: vec![col.clone()],
662                        operator: WhereOperator::Eq,
663                        value,
664                    })
665                })
666                .collect::<Result<Vec<_>>>()?;
667
668            if let Some(rls) = rls_where_clause {
669                conditions.insert(0, rls.into_where_clause());
670            }
671            match conditions.len() {
672                0 => None,
673                1 => Some(conditions.remove(0)),
674                _ => Some(WhereClause::And(conditions)),
675            }
676        };
677
678        // 3b. Compose user-supplied WHERE when has_where is enabled (same as execute_from_match).
679        let combined_where: Option<WhereClause> = if query_match.query_def.auto_params.has_where {
680            let user_where = query_match
681                .arguments
682                .get("where")
683                .map(WhereClause::from_graphql_json)
684                .transpose()?;
685            match (combined_where, user_where) {
686                (None, None) => None,
687                (Some(sec), None) => Some(sec),
688                (None, Some(user)) => Some(user),
689                (Some(sec), Some(user)) => Some(WhereClause::And(vec![sec, user])),
690            }
691        } else {
692            combined_where
693        };
694
695        // 4. Execute COUNT query via adapter
696        let rows = self
697            .adapter
698            .execute_where_query(sql_source, combined_where.as_ref(), None, None, None)
699            .await?;
700
701        // Return the row count
702        #[allow(clippy::cast_possible_truncation)] // Reason: row count fits u64
703        Ok(rows.len() as u64)
704    }
705
706    /// Execute a Relay connection query with cursor-based (keyset) pagination.
707    ///
708    /// Reads `first`, `after`, `last`, `before` from `variables`, fetches a page
709    /// of rows using `pk_{type}` keyset ordering, and wraps the result in the
710    /// Relay `XxxConnection` format:
711    /// ```json
712    /// {
713    ///   "data": {
714    ///     "users": {
715    ///       "edges": [{ "cursor": "NDI=", "node": { "id": "...", ... } }],
716    ///       "pageInfo": {
717    ///         "hasNextPage": true, "hasPreviousPage": false,
718    ///         "startCursor": "NDI=", "endCursor": "Mw=="
719    ///       }
720    ///     }
721    ///   }
722    /// }
723    /// ```
724    ///
725    /// # Errors
726    ///
727    /// Returns [`FraiseQLError::Validation`] if required pagination variables are
728    /// missing or contain invalid cursor values.
729    /// Returns [`FraiseQLError::Database`] if the SQL execution or result projection fails.
730    pub(super) async fn execute_relay_query(
731        &self,
732        query_match: &crate::runtime::matcher::QueryMatch,
733        variables: Option<&serde_json::Value>,
734        security_context: Option<&SecurityContext>,
735    ) -> Result<String> {
736        use crate::{
737            compiler::aggregation::OrderByClause,
738            runtime::relay::{decode_edge_cursor, decode_uuid_cursor, encode_edge_cursor},
739            schema::CursorType,
740        };
741
742        let query_def = &query_match.query_def;
743
744        // Guard: queries with inject params require a security context.
745        if !query_def.inject_params.is_empty() && security_context.is_none() {
746            return Err(FraiseQLError::Validation {
747                message: format!(
748                    "Query '{}' has inject params but was called without a security context",
749                    query_def.name
750                ),
751                path:    None,
752            });
753        }
754
755        let sql_source =
756            query_def.sql_source.as_deref().ok_or_else(|| FraiseQLError::Validation {
757                message: format!("Relay query '{}' has no sql_source configured", query_def.name),
758                path:    None,
759            })?;
760
761        let cursor_column =
762            query_def
763                .relay_cursor_column
764                .as_deref()
765                .ok_or_else(|| FraiseQLError::Validation {
766                    message: format!(
767                        "Relay query '{}' has no relay_cursor_column derived",
768                        query_def.name
769                    ),
770                    path:    None,
771                })?;
772
773        // Guard: relay pagination requires the executor to have been constructed
774        // via `Executor::new_with_relay` with a `RelayDatabaseAdapter`.
775        let relay = self.relay.as_ref().ok_or_else(|| FraiseQLError::Validation {
776            message: format!(
777                "Relay pagination is not supported by the {} adapter. \
778                 Use a relay-capable adapter (e.g. PostgreSQL) and construct \
779                 the executor with `Executor::new_with_relay`.",
780                self.adapter.database_type()
781            ),
782            path:    None,
783        })?;
784
785        // --- RLS + inject_params evaluation (same logic as execute_from_match) ---
786        // Evaluate RLS policy to generate security WHERE clause.
787        let rls_where_clause: Option<RlsWhereClause> = if let (Some(ref rls_policy), Some(ctx)) =
788            (&self.config.rls_policy, security_context)
789        {
790            rls_policy.evaluate(ctx, &query_def.name)?
791        } else {
792            None
793        };
794
795        // Resolve inject_params from JWT claims and compose with RLS.
796        let security_where: Option<WhereClause> = if query_def.inject_params.is_empty() {
797            rls_where_clause.map(RlsWhereClause::into_where_clause)
798        } else {
799            let ctx = security_context.ok_or_else(|| FraiseQLError::Validation {
800                message: format!(
801                    "Query '{}' has inject params but was called without a security context",
802                    query_def.name
803                ),
804                path:    None,
805            })?;
806            let mut conditions: Vec<WhereClause> = query_def
807                .inject_params
808                .iter()
809                .map(|(col, source)| {
810                    let value = resolve_inject_value(col, source, ctx)?;
811                    Ok(WhereClause::Field {
812                        path: vec![col.clone()],
813                        operator: WhereOperator::Eq,
814                        value,
815                    })
816                })
817                .collect::<Result<Vec<_>>>()?;
818
819            if let Some(rls) = rls_where_clause {
820                conditions.insert(0, rls.into_where_clause());
821            }
822            match conditions.len() {
823                0 => None,
824                1 => Some(conditions.remove(0)),
825                _ => Some(WhereClause::And(conditions)),
826            }
827        };
828
829        // Extract relay pagination arguments from variables.
830        let vars = variables.and_then(|v| v.as_object());
831        let first: Option<u32> = vars
832            .and_then(|v| v.get("first"))
833            .and_then(|v| v.as_u64())
834            .map(|n| u32::try_from(n).unwrap_or(u32::MAX));
835        let last: Option<u32> = vars
836            .and_then(|v| v.get("last"))
837            .and_then(|v| v.as_u64())
838            .map(|n| u32::try_from(n).unwrap_or(u32::MAX));
839        let after_cursor: Option<&str> = vars.and_then(|v| v.get("after")).and_then(|v| v.as_str());
840        let before_cursor: Option<&str> =
841            vars.and_then(|v| v.get("before")).and_then(|v| v.as_str());
842
843        // Decode base64 cursors — type depends on relay_cursor_type.
844        // If a cursor string is provided but fails to decode, return a validation
845        // error immediately. Silently ignoring an invalid cursor would return a
846        // full result set, violating the client's pagination intent.
847        let (after_pk, before_pk) =
848            match query_def.relay_cursor_type {
849                CursorType::Int64 => {
850                    let after = match after_cursor {
851                        Some(s) => Some(decode_edge_cursor(s).map(CursorValue::Int64).ok_or_else(
852                            || FraiseQLError::Validation {
853                                message: format!("invalid relay cursor for `after`: {s:?}"),
854                                path:    Some("after".to_string()),
855                            },
856                        )?),
857                        None => None,
858                    };
859                    let before = match before_cursor {
860                        Some(s) => Some(decode_edge_cursor(s).map(CursorValue::Int64).ok_or_else(
861                            || FraiseQLError::Validation {
862                                message: format!("invalid relay cursor for `before`: {s:?}"),
863                                path:    Some("before".to_string()),
864                            },
865                        )?),
866                        None => None,
867                    };
868                    (after, before)
869                },
870                CursorType::Uuid => {
871                    let after = match after_cursor {
872                        Some(s) => {
873                            Some(decode_uuid_cursor(s).map(CursorValue::Uuid).ok_or_else(|| {
874                                FraiseQLError::Validation {
875                                    message: format!("invalid relay cursor for `after`: {s:?}"),
876                                    path:    Some("after".to_string()),
877                                }
878                            })?)
879                        },
880                        None => None,
881                    };
882                    let before = match before_cursor {
883                        Some(s) => {
884                            Some(decode_uuid_cursor(s).map(CursorValue::Uuid).ok_or_else(|| {
885                                FraiseQLError::Validation {
886                                    message: format!("invalid relay cursor for `before`: {s:?}"),
887                                    path:    Some("before".to_string()),
888                                }
889                            })?)
890                        },
891                        None => None,
892                    };
893                    (after, before)
894                },
895            };
896
897        // Determine direction and limit.
898        // Forward pagination takes priority; fallback to 20 if neither first/last given.
899        let (forward, page_size) = if last.is_some() && first.is_none() {
900            (false, last.unwrap_or(20))
901        } else {
902            (true, first.unwrap_or(20))
903        };
904
905        // Fetch page_size + 1 rows to detect hasNextPage/hasPreviousPage.
906        let fetch_limit = page_size + 1;
907
908        // Parse optional `where` filter from variables.
909        let user_where_clause = if query_def.auto_params.has_where {
910            vars.and_then(|v| v.get("where"))
911                .map(WhereClause::from_graphql_json)
912                .transpose()?
913        } else {
914            None
915        };
916
917        // Compose final WHERE: security (RLS + inject) AND user-supplied WHERE.
918        // Security conditions always come first so they cannot be bypassed.
919        let combined_where = match (security_where, user_where_clause) {
920            (None, None) => None,
921            (Some(sec), None) => Some(sec),
922            (None, Some(user)) => Some(user),
923            (Some(sec), Some(user)) => Some(WhereClause::And(vec![sec, user])),
924        };
925
926        // Parse optional `orderBy` from variables.
927        let order_by = if query_def.auto_params.has_order_by {
928            vars.and_then(|v| v.get("orderBy"))
929                .map(OrderByClause::from_graphql_json)
930                .transpose()?
931        } else {
932            None
933        };
934
935        // Detect whether the client selected `totalCount` inside the connection.
936        // Named fragment spreads are already expanded by the matcher's FragmentResolver.
937        // Inline fragments (`... on UserConnection { totalCount }`) remain as FieldSelection
938        // entries with a name starting with "..." — we recurse one level into those.
939        let include_total_count = query_match
940            .selections
941            .iter()
942            .find(|sel| sel.name == query_def.name)
943            .is_some_and(|connection_field| {
944                selections_contain_field(&connection_field.nested_fields, "totalCount")
945            });
946
947        // Capture before the move into execute_relay_page.
948        let had_after = after_pk.is_some();
949        let had_before = before_pk.is_some();
950
951        let result = relay
952            .execute_relay_page(
953                sql_source,
954                cursor_column,
955                after_pk,
956                before_pk,
957                fetch_limit,
958                forward,
959                combined_where.as_ref(),
960                order_by.as_deref(),
961                include_total_count,
962            )
963            .await?;
964
965        // Detect whether there are more pages.
966        let has_extra = result.rows.len() > page_size as usize;
967        let rows: Vec<_> = result.rows.into_iter().take(page_size as usize).collect();
968
969        let (has_next_page, has_previous_page) = if forward {
970            (has_extra, had_after)
971        } else {
972            (had_before, has_extra)
973        };
974
975        // Build edges: each edge has { cursor, node }.
976        let mut edges = Vec::with_capacity(rows.len());
977        let mut start_cursor_str: Option<String> = None;
978        let mut end_cursor_str: Option<String> = None;
979
980        for (i, row) in rows.iter().enumerate() {
981            let data = &row.data;
982
983            let col_val = data.as_object().and_then(|obj| obj.get(cursor_column));
984
985            let cursor_str = match query_def.relay_cursor_type {
986                CursorType::Int64 => col_val
987                    .and_then(|v| v.as_i64())
988                    .map(encode_edge_cursor)
989                    .ok_or_else(|| FraiseQLError::Validation {
990                        message: format!(
991                            "Relay query '{}': cursor column '{}' not found or not an integer in \
992                             result JSONB. Ensure the view exposes this column inside the `data` object.",
993                            query_def.name, cursor_column
994                        ),
995                        path: None,
996                    })?,
997                CursorType::Uuid => col_val
998                    .and_then(|v| v.as_str())
999                    .map(crate::runtime::relay::encode_uuid_cursor)
1000                    .ok_or_else(|| FraiseQLError::Validation {
1001                        message: format!(
1002                            "Relay query '{}': cursor column '{}' not found or not a string in \
1003                             result JSONB. Ensure the view exposes this column inside the `data` object.",
1004                            query_def.name, cursor_column
1005                        ),
1006                        path: None,
1007                    })?,
1008            };
1009
1010            if i == 0 {
1011                start_cursor_str = Some(cursor_str.clone());
1012            }
1013            end_cursor_str = Some(cursor_str.clone());
1014
1015            edges.push(serde_json::json!({
1016                "cursor": cursor_str,
1017                "node": data,
1018            }));
1019        }
1020
1021        let page_info = serde_json::json!({
1022            "hasNextPage": has_next_page,
1023            "hasPreviousPage": has_previous_page,
1024            "startCursor": start_cursor_str,
1025            "endCursor": end_cursor_str,
1026        });
1027
1028        let mut connection = serde_json::json!({
1029            "edges": edges,
1030            "pageInfo": page_info,
1031        });
1032
1033        // Include totalCount when the client requested it and the adapter provided it.
1034        if include_total_count {
1035            if let Some(count) = result.total_count {
1036                connection["totalCount"] = serde_json::json!(count);
1037            } else {
1038                connection["totalCount"] = serde_json::Value::Null;
1039            }
1040        }
1041
1042        let response = ResultProjector::wrap_in_data_envelope(connection, &query_def.name);
1043        Ok(serde_json::to_string(&response)?)
1044    }
1045
1046    /// Execute a Relay global `node(id: ID!)` query.
1047    ///
1048    /// Decodes the opaque node ID (`base64("TypeName:uuid")`), locates the
1049    /// appropriate SQL view by searching the compiled schema for a query that
1050    /// returns that type, and fetches the matching row.
1051    ///
1052    /// Returns `{ "data": { "node": <object> } }` on success, or
1053    /// `{ "data": { "node": null } }` when the object is not found.
1054    ///
1055    /// # Errors
1056    ///
1057    /// Returns `FraiseQLError::Validation` when:
1058    /// - The `id` argument is missing or malformed
1059    /// - No SQL view is registered for the requested type
1060    pub(super) async fn execute_node_query(
1061        &self,
1062        query: &str,
1063        variables: Option<&serde_json::Value>,
1064    ) -> Result<String> {
1065        use crate::{
1066            db::{WhereClause, where_clause::WhereOperator},
1067            runtime::relay::decode_node_id,
1068        };
1069
1070        // 1. Extract the raw opaque ID. Priority: $variables.id > inline literal in query text.
1071        let raw_id: String = if let Some(id_val) = variables
1072            .and_then(|v| v.as_object())
1073            .and_then(|obj| obj.get("id"))
1074            .and_then(|v| v.as_str())
1075        {
1076            id_val.to_string()
1077        } else {
1078            // Fall back to extracting inline literal, e.g. node(id: "NDI=")
1079            Self::extract_inline_node_id(query).ok_or_else(|| FraiseQLError::Validation {
1080                message: "node query: missing or unresolvable 'id' argument".to_string(),
1081                path:    Some("node.id".to_string()),
1082            })?
1083        };
1084
1085        // 2. Decode base64("TypeName:uuid") → (type_name, uuid).
1086        let (type_name, uuid) =
1087            decode_node_id(&raw_id).ok_or_else(|| FraiseQLError::Validation {
1088                message: format!("node query: invalid node ID '{raw_id}'"),
1089                path:    Some("node.id".to_string()),
1090            })?;
1091
1092        // 3. Find the SQL view for this type (O(1) index lookup built at startup).
1093        let sql_source: Arc<str> =
1094            self.node_type_index.get(&type_name).cloned().ok_or_else(|| {
1095                FraiseQLError::Validation {
1096                    message: format!("node query: no registered SQL view for type '{type_name}'"),
1097                    path:    Some("node.id".to_string()),
1098                }
1099            })?;
1100
1101        // 4. Build WHERE clause: data->>'id' = uuid
1102        let where_clause = WhereClause::Field {
1103            path:     vec!["id".to_string()],
1104            operator: WhereOperator::Eq,
1105            value:    serde_json::Value::String(uuid),
1106        };
1107
1108        // 5. Execute the query (limit 1).
1109        let rows = self
1110            .adapter
1111            .execute_where_query(&sql_source, Some(&where_clause), Some(1), None, None)
1112            .await?;
1113
1114        // 6. Return the first matching row (or null).
1115        let node_value = rows.into_iter().next().map_or(serde_json::Value::Null, |row| row.data);
1116
1117        let response = ResultProjector::wrap_in_data_envelope(node_value, "node");
1118        Ok(serde_json::to_string(&response)?)
1119    }
1120}
1121
1122/// Estimate the payload reduction percentage from projecting N fields.
1123///
1124/// Uses a simple heuristic: each projected field saves proportional space
1125/// relative to a baseline of 20 typical JSONB fields per row. Clamped to
1126/// [10, 90] so the hint is never misleadingly extreme.
1127fn compute_projection_reduction(projected_field_count: usize) -> u32 {
1128    // Baseline: assume a typical type has 20 fields.
1129    const BASELINE_FIELD_COUNT: usize = 20;
1130    let requested = projected_field_count.min(BASELINE_FIELD_COUNT);
1131    let saved = BASELINE_FIELD_COUNT.saturating_sub(requested);
1132    // saved / BASELINE * 100, clamped to [10, 90]
1133    #[allow(clippy::cast_possible_truncation)] // Reason: result is in 0..=100, fits u32
1134    let percent = ((saved * 100) / BASELINE_FIELD_COUNT) as u32;
1135    percent.clamp(10, 90)
1136}
1137
1138/// Return `true` if `field_name` appears in `selections`, including inside inline
1139/// fragment entries (`FieldSelection` whose name starts with `"..."`).
1140///
1141/// Named fragment spreads are already flattened by [`FragmentResolver`] before this
1142/// is called, so we only need to recurse one level into inline fragments.
1143fn selections_contain_field(
1144    selections: &[crate::graphql::FieldSelection],
1145    field_name: &str,
1146) -> bool {
1147    for sel in selections {
1148        if sel.name == field_name {
1149            return true;
1150        }
1151        // Inline fragment: name starts with "..." (e.g. "...on UserConnection")
1152        if sel.name.starts_with("...") && selections_contain_field(&sel.nested_fields, field_name) {
1153            return true;
1154        }
1155    }
1156    false
1157}
1158
1159#[cfg(test)]
1160mod tests {
1161    use super::*;
1162    use crate::graphql::FieldSelection;
1163
1164    // -------------------------------------------------------------------------
1165    // Helpers
1166    // -------------------------------------------------------------------------
1167
1168    fn leaf(name: &str) -> FieldSelection {
1169        FieldSelection {
1170            name:          name.to_string(),
1171            alias:         None,
1172            arguments:     vec![],
1173            nested_fields: vec![],
1174            directives:    vec![],
1175        }
1176    }
1177
1178    fn fragment(name: &str, nested: Vec<FieldSelection>) -> FieldSelection {
1179        FieldSelection {
1180            name:          name.to_string(),
1181            alias:         None,
1182            arguments:     vec![],
1183            nested_fields: nested,
1184            directives:    vec![],
1185        }
1186    }
1187
1188    // =========================================================================
1189    // compute_projection_reduction
1190    // =========================================================================
1191
1192    #[test]
1193    fn projection_reduction_zero_fields_is_clamped_to_90() {
1194        // 0 fields requested → saved = 20 → 100% → clamped to 90
1195        assert_eq!(compute_projection_reduction(0), 90);
1196    }
1197
1198    #[test]
1199    fn projection_reduction_all_fields_is_clamped_to_10() {
1200        // 20 fields (= baseline) → saved = 0 → 0% → clamped to 10
1201        assert_eq!(compute_projection_reduction(20), 10);
1202    }
1203
1204    #[test]
1205    fn projection_reduction_above_baseline_clamps_to_10() {
1206        // 50 fields > 20 baseline → same as 20 → clamped to 10
1207        assert_eq!(compute_projection_reduction(50), 10);
1208    }
1209
1210    #[test]
1211    fn projection_reduction_10_fields_is_50_percent() {
1212        // 10 requested → saved = 10 → 10/20 * 100 = 50 → within [10, 90]
1213        assert_eq!(compute_projection_reduction(10), 50);
1214    }
1215
1216    #[test]
1217    fn projection_reduction_1_field_is_high() {
1218        // 1 requested → saved = 19 → 95% → clamped to 90
1219        assert_eq!(compute_projection_reduction(1), 90);
1220    }
1221
1222    #[test]
1223    fn projection_reduction_result_always_in_clamp_range() {
1224        for n in 0_usize..=30 {
1225            let r = compute_projection_reduction(n);
1226            assert!((10..=90).contains(&r), "out of [10,90] for n={n}: got {r}");
1227        }
1228    }
1229
1230    // =========================================================================
1231    // selections_contain_field
1232    // =========================================================================
1233
1234    #[test]
1235    fn empty_selections_returns_false() {
1236        assert!(!selections_contain_field(&[], "totalCount"));
1237    }
1238
1239    #[test]
1240    fn direct_match_returns_true() {
1241        let sels = vec![leaf("edges"), leaf("totalCount"), leaf("pageInfo")];
1242        assert!(selections_contain_field(&sels, "totalCount"));
1243    }
1244
1245    #[test]
1246    fn absent_field_returns_false() {
1247        let sels = vec![leaf("edges"), leaf("pageInfo")];
1248        assert!(!selections_contain_field(&sels, "totalCount"));
1249    }
1250
1251    #[test]
1252    fn inline_fragment_nested_match_returns_true() {
1253        // "...on UserConnection" wrapping totalCount
1254        let inline = fragment("...on UserConnection", vec![leaf("totalCount"), leaf("edges")]);
1255        let sels = vec![inline];
1256        assert!(selections_contain_field(&sels, "totalCount"));
1257    }
1258
1259    #[test]
1260    fn inline_fragment_does_not_spuriously_match_fragment_name() {
1261        // The fragment entry (name "...on Foo") only matches a field named exactly "...on Foo"
1262        // when searched directly; it should NOT match an unrelated field name.
1263        let inline = fragment("...on Foo", vec![leaf("id")]);
1264        let sels = vec![inline];
1265        assert!(!selections_contain_field(&sels, "totalCount"));
1266        // "id" is nested inside the fragment and should be found via recursion
1267        assert!(selections_contain_field(&sels, "id"));
1268    }
1269
1270    #[test]
1271    fn field_not_in_fragment_returns_false() {
1272        let inline = fragment("...on UserConnection", vec![leaf("edges"), leaf("pageInfo")]);
1273        let sels = vec![inline];
1274        assert!(!selections_contain_field(&sels, "totalCount"));
1275    }
1276
1277    #[test]
1278    fn non_fragment_nested_field_not_searched() {
1279        // Only entries whose name starts with "..." trigger recursion.
1280        // A plain field's nested_fields should NOT be recursed into.
1281        let nested_count = fragment("edges", vec![leaf("totalCount")]);
1282        let sels = vec![nested_count];
1283        // "edges" doesn't start with "..." — nested fields not searched
1284        assert!(!selections_contain_field(&sels, "totalCount"));
1285    }
1286
1287    #[test]
1288    fn multiple_fragments_any_can_match() {
1289        let frag1 = fragment("...on TypeA", vec![leaf("id")]);
1290        let frag2 = fragment("...on TypeB", vec![leaf("totalCount")]);
1291        let sels = vec![frag1, frag2];
1292        assert!(selections_contain_field(&sels, "totalCount"));
1293        assert!(selections_contain_field(&sels, "id"));
1294        assert!(!selections_contain_field(&sels, "name"));
1295    }
1296
1297    #[test]
1298    fn mixed_direct_and_fragment_selections() {
1299        let inline = fragment("...on Connection", vec![leaf("pageInfo")]);
1300        let sels = vec![leaf("edges"), inline, leaf("metadata")];
1301        assert!(selections_contain_field(&sels, "edges"));
1302        assert!(selections_contain_field(&sels, "pageInfo"));
1303        assert!(selections_contain_field(&sels, "metadata"));
1304        assert!(!selections_contain_field(&sels, "cursor"));
1305    }
1306}