Skip to main content

fraiseql_core/runtime/executor/
query.rs

1//! Regular and relay query execution.
2
3use std::sync::Arc;
4
5use super::{Executor, null_masked_fields, resolve_inject_value};
6use crate::{
7    db::{
8        CursorValue, ProjectionField, WhereClause, WhereOperator,
9        projection_generator::PostgresProjectionGenerator, traits::DatabaseAdapter,
10    },
11    error::{FraiseQLError, Result},
12    graphql::FieldSelection,
13    runtime::{JsonbStrategy, ResultProjector},
14    schema::{CompiledSchema, SqlProjectionHint},
15    security::{RlsWhereClause, SecurityContext},
16};
17
18/// Build a recursive [`ProjectionField`] tree from a GraphQL selection set.
19///
20/// For each field in `selections`, consults the compiled schema to determine
21/// whether the field is composite (Object) or scalar, and — for Object fields —
22/// recurses into the requested sub-fields to produce a nested
23/// `jsonb_build_object(...)` at the SQL level instead of returning the full blob.
24///
25/// List fields always fall back to `data->'field'` (full blob) because
26/// sub-projection inside aggregated JSONB arrays is out of scope.
27///
28/// Recursion is capped at 4 levels, matching `MAX_PROJECTION_DEPTH` in the
29/// projection generator.
30fn build_typed_projection_fields(
31    selections: &[FieldSelection],
32    schema: &CompiledSchema,
33    parent_type_name: &str,
34    depth: usize,
35) -> Vec<ProjectionField> {
36    const MAX_DEPTH: usize = 4;
37
38    let type_def = schema.find_type(parent_type_name);
39    selections
40        .iter()
41        .map(|sel| {
42            let field_def =
43                type_def.and_then(|td| td.fields.iter().find(|f| f.name == sel.name.as_str()));
44
45            let is_composite = field_def.is_some_and(|fd| !fd.field_type.is_scalar());
46            let is_list = field_def.is_some_and(|fd| fd.field_type.is_list());
47
48            // Recurse into Object types only — List fields fall back to full blob
49            let sub_fields = if is_composite
50                && !is_list
51                && !sel.nested_fields.is_empty()
52                && depth < MAX_DEPTH
53            {
54                let child_type = field_def.and_then(|fd| fd.field_type.type_name()).unwrap_or("");
55                if child_type.is_empty() {
56                    None
57                } else {
58                    Some(build_typed_projection_fields(
59                        &sel.nested_fields,
60                        schema,
61                        child_type,
62                        depth + 1,
63                    ))
64                }
65            } else {
66                None
67            };
68
69            ProjectionField {
70                name: sel.response_key().to_string(),
71                is_composite,
72                sub_fields,
73            }
74        })
75        .collect()
76}
77
78impl<A: DatabaseAdapter> Executor<A> {
79    /// Execute a regular query with row-level security (RLS) filtering.
80    ///
81    /// This method:
82    /// 1. Validates the user's security context (token expiration, etc.)
83    /// 2. Evaluates RLS policies to determine what rows the user can access
84    /// 3. Composes RLS filters with user-provided WHERE clauses
85    /// 4. Passes the composed filter to the database adapter for SQL-level filtering
86    ///
87    /// RLS filtering happens at the database level, not in Rust, ensuring:
88    /// - High performance (database can optimize filters)
89    /// - Correct handling of pagination (LIMIT applied after RLS filtering)
90    /// - Type-safe composition via `WhereClause` enum
91    ///
92    /// # Errors
93    ///
94    /// * [`FraiseQLError::Validation`] — security token expired, role check failed, or query not
95    ///   found in schema.
96    /// * [`FraiseQLError::Database`] — the database adapter returned an error.
97    pub(super) async fn execute_regular_query_with_security(
98        &self,
99        query: &str,
100        variables: Option<&serde_json::Value>,
101        security_context: &SecurityContext,
102    ) -> Result<String> {
103        // 1. Validate security context (check expiration, etc.)
104        if security_context.is_expired() {
105            return Err(FraiseQLError::Validation {
106                message: "Security token has expired".to_string(),
107                path:    Some("request.authorization".to_string()),
108            });
109        }
110
111        // 2. Match query to compiled template
112        let query_match = self.matcher.match_query(query, variables)?;
113
114        // 2b. Enforce requires_role — return "not found" (not "forbidden") to prevent enumeration
115        if let Some(ref required_role) = query_match.query_def.requires_role {
116            if !security_context.roles.iter().any(|r| r == required_role) {
117                return Err(FraiseQLError::Validation {
118                    message: format!("Query '{}' not found in schema", query_match.query_def.name),
119                    path:    None,
120                });
121            }
122        }
123
124        // Route relay queries to dedicated handler with security context.
125        if query_match.query_def.relay {
126            return self.execute_relay_query(&query_match, variables, Some(security_context)).await;
127        }
128
129        // 3. Create execution plan
130        let plan = self.planner.plan(&query_match)?;
131
132        // 4. Evaluate RLS policy and build WHERE clause filter. The return type is
133        //    Option<RlsWhereClause> — a compile-time proof that the clause passed through RLS
134        //    evaluation.
135        let rls_where_clause: Option<RlsWhereClause> =
136            if let Some(ref rls_policy) = self.config.rls_policy {
137                // Evaluate RLS policy with user's security context
138                rls_policy.evaluate(security_context, &query_match.query_def.name)?
139            } else {
140                // No RLS policy configured, allow all access
141                None
142            };
143
144        // 5. Get SQL source from query definition
145        let sql_source =
146            query_match
147                .query_def
148                .sql_source
149                .as_ref()
150                .ok_or_else(|| FraiseQLError::Validation {
151                    message: "Query has no SQL source".to_string(),
152                    path:    None,
153                })?;
154
155        // 6. Generate SQL projection hint for requested fields (optimization).
156        //    Build a recursive ProjectionField tree from the selection set so that
157        //    composite sub-fields are projected with nested jsonb_build_object instead
158        //    of returning the full blob.
159        let projection_hint = if !plan.projection_fields.is_empty()
160            && plan.jsonb_strategy == JsonbStrategy::Project
161        {
162            let root_fields = query_match
163                .selections
164                .first()
165                .map_or(&[] as &[_], |s| s.nested_fields.as_slice());
166            let typed_fields = build_typed_projection_fields(
167                root_fields,
168                &self.schema,
169                &query_match.query_def.return_type,
170                0,
171            );
172
173            let generator = PostgresProjectionGenerator::new();
174            let projection_sql = generator
175                .generate_typed_projection_sql(&typed_fields)
176                .unwrap_or_else(|_| "data".to_string());
177
178            Some(SqlProjectionHint {
179                database:                    self.adapter.database_type(),
180                projection_template:         projection_sql,
181                estimated_reduction_percent: compute_projection_reduction(
182                    plan.projection_fields.len(),
183                ),
184            })
185        } else {
186            // Stream strategy: return full JSONB, no projection hint
187            None
188        };
189
190        // 7. AND inject conditions onto the RLS WHERE clause. Inject conditions always come after
191        //    RLS so they cannot bypass it.
192        let combined_where: Option<WhereClause> = if query_match.query_def.inject_params.is_empty()
193        {
194            // Common path: unwrap RlsWhereClause into WhereClause for the adapter
195            rls_where_clause.map(RlsWhereClause::into_where_clause)
196        } else {
197            let mut conditions: Vec<WhereClause> = query_match
198                .query_def
199                .inject_params
200                .iter()
201                .map(|(col, source)| {
202                    let value = resolve_inject_value(col, source, security_context)?;
203                    Ok(WhereClause::Field {
204                        path: vec![col.clone()],
205                        operator: WhereOperator::Eq,
206                        value,
207                    })
208                })
209                .collect::<Result<Vec<_>>>()?;
210
211            if let Some(rls) = rls_where_clause {
212                conditions.insert(0, rls.into_where_clause());
213            }
214            match conditions.len() {
215                0 => None,
216                1 => Some(conditions.remove(0)),
217                _ => Some(WhereClause::And(conditions)),
218            }
219        };
220
221        // 5b. Compose user-supplied WHERE from GraphQL arguments when has_where is enabled.
222        //     Security conditions (RLS + inject) are always first so they cannot be bypassed.
223        let combined_where: Option<WhereClause> = if query_match.query_def.auto_params.has_where {
224            let user_where = query_match
225                .arguments
226                .get("where")
227                .map(WhereClause::from_graphql_json)
228                .transpose()?;
229            match (combined_where, user_where) {
230                (None, None) => None,
231                (Some(sec), None) => Some(sec),
232                (None, Some(user)) => Some(user),
233                (Some(sec), Some(user)) => Some(WhereClause::And(vec![sec, user])),
234            }
235        } else {
236            combined_where
237        };
238
239        // 5c. Convert explicit query arguments (e.g. id, slug) to WHERE conditions.
240        //     This handles single-entity lookups like `user(id: "...")` where the
241        //     arguments are direct equality filters, not the structured `where` argument.
242        let combined_where = combine_explicit_arg_where(
243            combined_where,
244            &query_match.query_def.arguments,
245            &query_match.arguments,
246            &query_match.query_def.native_columns,
247        );
248
249        // 8. Extract limit/offset from query arguments when auto_params are enabled
250        let limit = if query_match.query_def.auto_params.has_limit {
251            query_match
252                .arguments
253                .get("limit")
254                .and_then(|v| v.as_u64())
255                .and_then(|v| u32::try_from(v).ok())
256        } else {
257            None
258        };
259
260        let offset = if query_match.query_def.auto_params.has_offset {
261            query_match
262                .arguments
263                .get("offset")
264                .and_then(|v| v.as_u64())
265                .and_then(|v| u32::try_from(v).ok())
266        } else {
267            None
268        };
269
270        // 8b. Extract order_by from query arguments when has_order_by is enabled
271        let order_by_clauses = if query_match.query_def.auto_params.has_order_by {
272            query_match
273                .arguments
274                .get("orderBy")
275                .map(crate::db::OrderByClause::from_graphql_json)
276                .transpose()?
277        } else {
278            None
279        };
280
281        // 9. Execute query with combined WHERE clause filter
282        let results = self
283            .adapter
284            .execute_with_projection_arc(
285                sql_source,
286                projection_hint.as_ref(),
287                combined_where.as_ref(),
288                limit,
289                offset,
290                order_by_clauses.as_deref(),
291            )
292            .await?;
293
294        // 10. Apply field-level RBAC filtering (reject / mask / allow)
295        let access = self.apply_field_rbac_filtering(
296            &query_match.query_def.return_type,
297            plan.projection_fields,
298            security_context,
299        )?;
300
301        // 11. Project results — include both allowed and masked fields in projection
302        let mut all_projection_fields = access.allowed;
303        all_projection_fields.extend(access.masked.iter().cloned());
304        let projector = ResultProjector::new(all_projection_fields);
305        let mut projected =
306            projector.project_results(&results, query_match.query_def.returns_list)?;
307
308        // 11. Null out masked fields in the projected result
309        if !access.masked.is_empty() {
310            null_masked_fields(&mut projected, &access.masked);
311        }
312
313        // 12. Wrap in GraphQL data envelope
314        let response =
315            ResultProjector::wrap_in_data_envelope(projected, &query_match.query_def.name);
316
317        // 13. Serialize to JSON string
318        Ok(serde_json::to_string(&response)?)
319    }
320
321    /// Execute a regular (non-aggregate, non-relay) GraphQL query.
322    ///
323    /// # Errors
324    ///
325    /// Returns [`FraiseQLError::Validation`] if the query does not match a compiled
326    /// template or requires a security context that is not present.
327    /// Returns [`FraiseQLError::Database`] if the SQL execution or result projection fails.
328    pub(super) async fn execute_regular_query(
329        &self,
330        query: &str,
331        variables: Option<&serde_json::Value>,
332    ) -> Result<String> {
333        // 1. Match query to compiled template
334        let query_match = self.matcher.match_query(query, variables)?;
335
336        // Guard: role-restricted queries are invisible to unauthenticated users
337        if query_match.query_def.requires_role.is_some() {
338            return Err(FraiseQLError::Validation {
339                message: format!("Query '{}' not found in schema", query_match.query_def.name),
340                path:    None,
341            });
342        }
343
344        // Guard: queries with inject params require a security context.
345        if !query_match.query_def.inject_params.is_empty() {
346            return Err(FraiseQLError::Validation {
347                message: format!(
348                    "Query '{}' has inject params but was called without a security context",
349                    query_match.query_def.name
350                ),
351                path:    None,
352            });
353        }
354
355        // Route relay queries to dedicated handler.
356        if query_match.query_def.relay {
357            return self.execute_relay_query(&query_match, variables, None).await;
358        }
359
360        // 2. Create execution plan
361        let plan = self.planner.plan(&query_match)?;
362
363        // 3. Execute SQL query
364        let sql_source = query_match.query_def.sql_source.as_ref().ok_or_else(|| {
365            crate::error::FraiseQLError::Validation {
366                message: "Query has no SQL source".to_string(),
367                path:    None,
368            }
369        })?;
370
371        // 3a. Generate SQL projection hint for requested fields (optimization).
372        //     Recursive typed projection: composite sub-fields are projected with nested
373        //     jsonb_build_object instead of returning the full blob.
374        let projection_hint = if !plan.projection_fields.is_empty()
375            && plan.jsonb_strategy == JsonbStrategy::Project
376        {
377            let root_fields = query_match
378                .selections
379                .first()
380                .map_or(&[] as &[_], |s| s.nested_fields.as_slice());
381            let typed_fields = build_typed_projection_fields(
382                root_fields,
383                &self.schema,
384                &query_match.query_def.return_type,
385                0,
386            );
387            let generator = PostgresProjectionGenerator::new();
388            let projection_sql = generator
389                .generate_typed_projection_sql(&typed_fields)
390                .unwrap_or_else(|_| "data".to_string());
391
392            Some(SqlProjectionHint {
393                database:                    self.adapter.database_type(),
394                projection_template:         projection_sql,
395                estimated_reduction_percent: compute_projection_reduction(
396                    plan.projection_fields.len(),
397                ),
398            })
399        } else {
400            // Stream strategy: return full JSONB, no projection hint
401            None
402        };
403
404        // 3b. Extract auto_params (limit, offset, where, order_by) from arguments
405        let user_where: Option<WhereClause> = if query_match.query_def.auto_params.has_where {
406            query_match
407                .arguments
408                .get("where")
409                .map(WhereClause::from_graphql_json)
410                .transpose()?
411        } else {
412            None
413        };
414
415        // 3c. Convert explicit query arguments (e.g. id, slug) to WHERE conditions.
416        let user_where = combine_explicit_arg_where(
417            user_where,
418            &query_match.query_def.arguments,
419            &query_match.arguments,
420            &query_match.query_def.native_columns,
421        );
422
423        let limit = if query_match.query_def.auto_params.has_limit {
424            query_match
425                .arguments
426                .get("limit")
427                .and_then(|v| v.as_u64())
428                .and_then(|v| u32::try_from(v).ok())
429        } else {
430            None
431        };
432
433        let offset = if query_match.query_def.auto_params.has_offset {
434            query_match
435                .arguments
436                .get("offset")
437                .and_then(|v| v.as_u64())
438                .and_then(|v| u32::try_from(v).ok())
439        } else {
440            None
441        };
442
443        let order_by_clauses = if query_match.query_def.auto_params.has_order_by {
444            query_match
445                .arguments
446                .get("orderBy")
447                .map(crate::db::OrderByClause::from_graphql_json)
448                .transpose()?
449        } else {
450            None
451        };
452
453        let results = self
454            .adapter
455            .execute_with_projection_arc(
456                sql_source,
457                projection_hint.as_ref(),
458                user_where.as_ref(),
459                limit,
460                offset,
461                order_by_clauses.as_deref(),
462            )
463            .await?;
464
465        // 4. Project results
466        let projector = ResultProjector::new(plan.projection_fields);
467        let projected = projector.project_results(&results, query_match.query_def.returns_list)?;
468
469        // 5. Wrap in GraphQL data envelope
470        let response =
471            ResultProjector::wrap_in_data_envelope(projected, &query_match.query_def.name);
472
473        // 6. Serialize to JSON string
474        Ok(serde_json::to_string(&response)?)
475    }
476
477    /// Execute a pre-built `QueryMatch` directly, bypassing GraphQL string parsing.
478    ///
479    /// Used by the REST transport for embedded sub-queries and NDJSON streaming
480    /// where the query parameters are already resolved from HTTP request parameters.
481    ///
482    /// # Errors
483    ///
484    /// Returns `FraiseQLError::Validation` if the query has no SQL source.
485    /// Returns `FraiseQLError::Database` if the adapter returns an error.
486    pub async fn execute_query_direct(
487        &self,
488        query_match: &crate::runtime::matcher::QueryMatch,
489        _variables: Option<&serde_json::Value>,
490        security_context: Option<&SecurityContext>,
491    ) -> Result<String> {
492        // Evaluate RLS policy if present.
493        let rls_where_clause: Option<RlsWhereClause> = if let (Some(ref rls_policy), Some(ctx)) =
494            (&self.config.rls_policy, security_context)
495        {
496            rls_policy.evaluate(ctx, &query_match.query_def.name)?
497        } else {
498            None
499        };
500
501        // Get SQL source.
502        let sql_source =
503            query_match
504                .query_def
505                .sql_source
506                .as_ref()
507                .ok_or_else(|| FraiseQLError::Validation {
508                    message: "Query has no SQL source".to_string(),
509                    path:    None,
510                })?;
511
512        // Build execution plan.
513        let plan = self.planner.plan(query_match)?;
514
515        // Extract auto_params from arguments.
516        let user_where: Option<WhereClause> = if query_match.query_def.auto_params.has_where {
517            query_match
518                .arguments
519                .get("where")
520                .map(WhereClause::from_graphql_json)
521                .transpose()?
522        } else {
523            None
524        };
525
526        let limit = query_match
527            .arguments
528            .get("limit")
529            .and_then(|v| v.as_u64())
530            .and_then(|v| u32::try_from(v).ok());
531
532        let offset = query_match
533            .arguments
534            .get("offset")
535            .and_then(|v| v.as_u64())
536            .and_then(|v| u32::try_from(v).ok());
537
538        let order_by_clauses = query_match
539            .arguments
540            .get("orderBy")
541            .map(crate::db::OrderByClause::from_graphql_json)
542            .transpose()?;
543
544        // Convert explicit arguments to WHERE conditions.
545        let user_where = combine_explicit_arg_where(
546            user_where,
547            &query_match.query_def.arguments,
548            &query_match.arguments,
549            &query_match.query_def.native_columns,
550        );
551
552        // Compose RLS and user WHERE clauses.
553        let composed_where = match (&rls_where_clause, &user_where) {
554            (Some(rls), Some(user)) => {
555                Some(WhereClause::And(vec![rls.as_where_clause().clone(), user.clone()]))
556            },
557            (Some(rls), None) => Some(rls.as_where_clause().clone()),
558            (None, Some(user)) => Some(user.clone()),
559            (None, None) => None,
560        };
561
562        // Inject security-derived params.
563        if !query_match.query_def.inject_params.is_empty() {
564            if let Some(ctx) = security_context {
565                for (param_name, source) in &query_match.query_def.inject_params {
566                    let _value = resolve_inject_value(param_name, source, ctx)?;
567                    // Injected params are applied at the SQL level via WHERE clauses,
568                    // not via GraphQL variables, so no mutation of variables is needed here.
569                }
570            }
571        }
572
573        // Execute.
574        let results = self
575            .adapter
576            .execute_with_projection_arc(
577                sql_source,
578                None,
579                composed_where.as_ref(),
580                limit,
581                offset,
582                order_by_clauses.as_deref(),
583            )
584            .await?;
585
586        // Project results.
587        let projector = ResultProjector::new(plan.projection_fields);
588        let projected = projector.project_results(&results, query_match.query_def.returns_list)?;
589
590        // Wrap in GraphQL data envelope.
591        let response =
592            ResultProjector::wrap_in_data_envelope(projected, &query_match.query_def.name);
593
594        Ok(serde_json::to_string(&response)?)
595    }
596
597    /// Execute a mutation with security context for REST transport.
598    ///
599    /// Delegates to the standard mutation execution path with RLS enforcement.
600    ///
601    /// # Errors
602    ///
603    /// Returns `FraiseQLError::Database` if the adapter returns an error.
604    /// Returns `FraiseQLError::Validation` if inject params require a missing security context.
605    pub async fn execute_mutation_with_security(
606        &self,
607        mutation_name: &str,
608        arguments: &serde_json::Value,
609        security_context: Option<&crate::security::SecurityContext>,
610    ) -> crate::error::Result<String> {
611        // Build a synthetic GraphQL mutation query and delegate to execute()
612        let args_str = if let Some(obj) = arguments.as_object() {
613            obj.iter().map(|(k, v)| format!("{k}: {v}")).collect::<Vec<_>>().join(", ")
614        } else {
615            String::new()
616        };
617        let query = if args_str.is_empty() {
618            format!("mutation {{ {mutation_name} {{ status entity_id message }} }}")
619        } else {
620            format!("mutation {{ {mutation_name}({args_str}) {{ status entity_id message }} }}")
621        };
622
623        if let Some(ctx) = security_context {
624            self.execute_with_security(&query, None, ctx).await
625        } else {
626            self.execute(&query, None).await
627        }
628    }
629
630    /// Execute a batch of mutations (for REST bulk insert).
631    ///
632    /// Executes each mutation individually and collects results into a `BulkResult`.
633    ///
634    /// # Errors
635    ///
636    /// Returns the first error encountered during batch execution.
637    pub async fn execute_mutation_batch(
638        &self,
639        mutation_name: &str,
640        items: &[serde_json::Value],
641        security_context: Option<&crate::security::SecurityContext>,
642    ) -> crate::error::Result<crate::runtime::BulkResult> {
643        let mut entities = Vec::with_capacity(items.len());
644        for item in items {
645            let result = self
646                .execute_mutation_with_security(mutation_name, item, security_context)
647                .await?;
648            entities.push(serde_json::Value::String(result));
649        }
650        Ok(crate::runtime::BulkResult {
651            affected_rows: entities.len() as u64,
652            entities:      Some(entities),
653        })
654    }
655
656    /// Execute a bulk operation (collection-level PATCH/DELETE) by filter.
657    ///
658    /// # Errors
659    ///
660    /// Returns `FraiseQLError::Database` if the adapter returns an error.
661    pub async fn execute_bulk_by_filter(
662        &self,
663        query_match: &crate::runtime::matcher::QueryMatch,
664        mutation_name: &str,
665        body: Option<&serde_json::Value>,
666        _id_field: &str,
667        _max_affected: u64,
668        security_context: Option<&SecurityContext>,
669    ) -> crate::error::Result<crate::runtime::BulkResult> {
670        // Execute the filter query to find matching rows.
671        let result_str = self.execute_query_direct(query_match, None, security_context).await?;
672
673        let args = body.cloned().unwrap_or(serde_json::json!({}));
674        let result = self
675            .execute_mutation_with_security(mutation_name, &args, security_context)
676            .await?;
677
678        let parsed: serde_json::Value =
679            serde_json::from_str(&result_str).unwrap_or(serde_json::json!({}));
680        let count = parsed
681            .get("data")
682            .and_then(|d| d.as_object())
683            .and_then(|o| o.values().next())
684            .and_then(|v| v.as_array())
685            .map_or(1, |a| a.len() as u64);
686
687        Ok(crate::runtime::BulkResult {
688            affected_rows: count,
689            entities:      Some(vec![serde_json::Value::String(result)]),
690        })
691    }
692
693    /// Count the total number of rows matching the query's WHERE and RLS conditions.
694    ///
695    /// Issues a `SELECT COUNT(*) FROM {view} WHERE {conditions}` query, ignoring
696    /// pagination (ORDER BY, LIMIT, OFFSET). Useful for REST `X-Total-Count` headers
697    /// and `count=exact` query parameter support.
698    ///
699    /// # Arguments
700    ///
701    /// * `query_match` - Pre-built query match identifying the SQL source and filters
702    /// * `variables` - Optional variables (unused for count, reserved for future use)
703    /// * `security_context` - Optional authenticated user context for RLS and inject
704    ///
705    /// # Errors
706    ///
707    /// Returns `FraiseQLError::Validation` if the query has no SQL source, or if
708    /// inject params are required but no security context is provided.
709    /// Returns `FraiseQLError::Database` if the adapter returns an error.
710    pub async fn count_rows(
711        &self,
712        query_match: &crate::runtime::matcher::QueryMatch,
713        _variables: Option<&serde_json::Value>,
714        security_context: Option<&SecurityContext>,
715    ) -> Result<u64> {
716        // 1. Evaluate RLS policy
717        let rls_where_clause: Option<RlsWhereClause> = if let (Some(ref rls_policy), Some(ctx)) =
718            (&self.config.rls_policy, security_context)
719        {
720            rls_policy.evaluate(ctx, &query_match.query_def.name)?
721        } else {
722            None
723        };
724
725        // 2. Get SQL source
726        let sql_source =
727            query_match
728                .query_def
729                .sql_source
730                .as_ref()
731                .ok_or_else(|| FraiseQLError::Validation {
732                    message: "Query has no SQL source".to_string(),
733                    path:    None,
734                })?;
735
736        // 3. Build combined WHERE clause (RLS + inject)
737        let combined_where: Option<WhereClause> = if query_match.query_def.inject_params.is_empty()
738        {
739            rls_where_clause.map(RlsWhereClause::into_where_clause)
740        } else {
741            let ctx = security_context.ok_or_else(|| FraiseQLError::Validation {
742                message: format!(
743                    "Query '{}' has inject params but no security context is available",
744                    query_match.query_def.name
745                ),
746                path:    None,
747            })?;
748            let mut conditions: Vec<WhereClause> = query_match
749                .query_def
750                .inject_params
751                .iter()
752                .map(|(col, source)| {
753                    let value = resolve_inject_value(col, source, ctx)?;
754                    Ok(WhereClause::Field {
755                        path: vec![col.clone()],
756                        operator: WhereOperator::Eq,
757                        value,
758                    })
759                })
760                .collect::<Result<Vec<_>>>()?;
761
762            if let Some(rls) = rls_where_clause {
763                conditions.insert(0, rls.into_where_clause());
764            }
765            match conditions.len() {
766                0 => None,
767                1 => Some(conditions.remove(0)),
768                _ => Some(WhereClause::And(conditions)),
769            }
770        };
771
772        // 3b. Compose user-supplied WHERE when has_where is enabled (same as execute_from_match).
773        let combined_where: Option<WhereClause> = if query_match.query_def.auto_params.has_where {
774            let user_where = query_match
775                .arguments
776                .get("where")
777                .map(WhereClause::from_graphql_json)
778                .transpose()?;
779            match (combined_where, user_where) {
780                (None, None) => None,
781                (Some(sec), None) => Some(sec),
782                (None, Some(user)) => Some(user),
783                (Some(sec), Some(user)) => Some(WhereClause::And(vec![sec, user])),
784            }
785        } else {
786            combined_where
787        };
788
789        // 4. Execute COUNT query via adapter
790        let rows = self
791            .adapter
792            .execute_where_query_arc(sql_source, combined_where.as_ref(), None, None, None)
793            .await?;
794
795        // Return the row count
796        #[allow(clippy::cast_possible_truncation)] // Reason: row count fits u64
797        Ok(rows.len() as u64)
798    }
799
800    /// Execute a Relay connection query with cursor-based (keyset) pagination.
801    ///
802    /// Reads `first`, `after`, `last`, `before` from `variables`, fetches a page
803    /// of rows using `pk_{type}` keyset ordering, and wraps the result in the
804    /// Relay `XxxConnection` format:
805    /// ```json
806    /// {
807    ///   "data": {
808    ///     "users": {
809    ///       "edges": [{ "cursor": "NDI=", "node": { "id": "...", ... } }],
810    ///       "pageInfo": {
811    ///         "hasNextPage": true, "hasPreviousPage": false,
812    ///         "startCursor": "NDI=", "endCursor": "Mw=="
813    ///       }
814    ///     }
815    ///   }
816    /// }
817    /// ```
818    ///
819    /// # Errors
820    ///
821    /// Returns [`FraiseQLError::Validation`] if required pagination variables are
822    /// missing or contain invalid cursor values.
823    /// Returns [`FraiseQLError::Database`] if the SQL execution or result projection fails.
824    pub(super) async fn execute_relay_query(
825        &self,
826        query_match: &crate::runtime::matcher::QueryMatch,
827        variables: Option<&serde_json::Value>,
828        security_context: Option<&SecurityContext>,
829    ) -> Result<String> {
830        use crate::{
831            compiler::aggregation::OrderByClause,
832            runtime::relay::{decode_edge_cursor, decode_uuid_cursor, encode_edge_cursor},
833            schema::CursorType,
834        };
835
836        let query_def = &query_match.query_def;
837
838        // Guard: queries with inject params require a security context.
839        if !query_def.inject_params.is_empty() && security_context.is_none() {
840            return Err(FraiseQLError::Validation {
841                message: format!(
842                    "Query '{}' has inject params but was called without a security context",
843                    query_def.name
844                ),
845                path:    None,
846            });
847        }
848
849        let sql_source =
850            query_def.sql_source.as_deref().ok_or_else(|| FraiseQLError::Validation {
851                message: format!("Relay query '{}' has no sql_source configured", query_def.name),
852                path:    None,
853            })?;
854
855        let cursor_column =
856            query_def
857                .relay_cursor_column
858                .as_deref()
859                .ok_or_else(|| FraiseQLError::Validation {
860                    message: format!(
861                        "Relay query '{}' has no relay_cursor_column derived",
862                        query_def.name
863                    ),
864                    path:    None,
865                })?;
866
867        // Guard: relay pagination requires the executor to have been constructed
868        // via `Executor::new_with_relay` with a `RelayDatabaseAdapter`.
869        let relay = self.relay.as_ref().ok_or_else(|| FraiseQLError::Validation {
870            message: format!(
871                "Relay pagination is not supported by the {} adapter. \
872                 Use a relay-capable adapter (e.g. PostgreSQL) and construct \
873                 the executor with `Executor::new_with_relay`.",
874                self.adapter.database_type()
875            ),
876            path:    None,
877        })?;
878
879        // --- RLS + inject_params evaluation (same logic as execute_from_match) ---
880        // Evaluate RLS policy to generate security WHERE clause.
881        let rls_where_clause: Option<RlsWhereClause> = if let (Some(ref rls_policy), Some(ctx)) =
882            (&self.config.rls_policy, security_context)
883        {
884            rls_policy.evaluate(ctx, &query_def.name)?
885        } else {
886            None
887        };
888
889        // Resolve inject_params from JWT claims and compose with RLS.
890        let security_where: Option<WhereClause> = if query_def.inject_params.is_empty() {
891            rls_where_clause.map(RlsWhereClause::into_where_clause)
892        } else {
893            let ctx = security_context.ok_or_else(|| FraiseQLError::Validation {
894                message: format!(
895                    "Query '{}' has inject params but was called without a security context",
896                    query_def.name
897                ),
898                path:    None,
899            })?;
900            let mut conditions: Vec<WhereClause> = query_def
901                .inject_params
902                .iter()
903                .map(|(col, source)| {
904                    let value = resolve_inject_value(col, source, ctx)?;
905                    Ok(WhereClause::Field {
906                        path: vec![col.clone()],
907                        operator: WhereOperator::Eq,
908                        value,
909                    })
910                })
911                .collect::<Result<Vec<_>>>()?;
912
913            if let Some(rls) = rls_where_clause {
914                conditions.insert(0, rls.into_where_clause());
915            }
916            match conditions.len() {
917                0 => None,
918                1 => Some(conditions.remove(0)),
919                _ => Some(WhereClause::And(conditions)),
920            }
921        };
922
923        // Extract relay pagination arguments from variables.
924        let vars = variables.and_then(|v| v.as_object());
925        let first: Option<u32> = vars
926            .and_then(|v| v.get("first"))
927            .and_then(|v| v.as_u64())
928            .map(|n| u32::try_from(n).unwrap_or(u32::MAX));
929        let last: Option<u32> = vars
930            .and_then(|v| v.get("last"))
931            .and_then(|v| v.as_u64())
932            .map(|n| u32::try_from(n).unwrap_or(u32::MAX));
933        let after_cursor: Option<&str> = vars.and_then(|v| v.get("after")).and_then(|v| v.as_str());
934        let before_cursor: Option<&str> =
935            vars.and_then(|v| v.get("before")).and_then(|v| v.as_str());
936
937        // Decode base64 cursors — type depends on relay_cursor_type.
938        // If a cursor string is provided but fails to decode, return a validation
939        // error immediately. Silently ignoring an invalid cursor would return a
940        // full result set, violating the client's pagination intent.
941        let (after_pk, before_pk) =
942            match query_def.relay_cursor_type {
943                CursorType::Int64 => {
944                    let after = match after_cursor {
945                        Some(s) => Some(decode_edge_cursor(s).map(CursorValue::Int64).ok_or_else(
946                            || FraiseQLError::Validation {
947                                message: format!("invalid relay cursor for `after`: {s:?}"),
948                                path:    Some("after".to_string()),
949                            },
950                        )?),
951                        None => None,
952                    };
953                    let before = match before_cursor {
954                        Some(s) => Some(decode_edge_cursor(s).map(CursorValue::Int64).ok_or_else(
955                            || FraiseQLError::Validation {
956                                message: format!("invalid relay cursor for `before`: {s:?}"),
957                                path:    Some("before".to_string()),
958                            },
959                        )?),
960                        None => None,
961                    };
962                    (after, before)
963                },
964                CursorType::Uuid => {
965                    let after = match after_cursor {
966                        Some(s) => {
967                            Some(decode_uuid_cursor(s).map(CursorValue::Uuid).ok_or_else(|| {
968                                FraiseQLError::Validation {
969                                    message: format!("invalid relay cursor for `after`: {s:?}"),
970                                    path:    Some("after".to_string()),
971                                }
972                            })?)
973                        },
974                        None => None,
975                    };
976                    let before = match before_cursor {
977                        Some(s) => {
978                            Some(decode_uuid_cursor(s).map(CursorValue::Uuid).ok_or_else(|| {
979                                FraiseQLError::Validation {
980                                    message: format!("invalid relay cursor for `before`: {s:?}"),
981                                    path:    Some("before".to_string()),
982                                }
983                            })?)
984                        },
985                        None => None,
986                    };
987                    (after, before)
988                },
989            };
990
991        // Determine direction and limit.
992        // Forward pagination takes priority; fallback to 20 if neither first/last given.
993        let (forward, page_size) = if last.is_some() && first.is_none() {
994            (false, last.unwrap_or(20))
995        } else {
996            (true, first.unwrap_or(20))
997        };
998
999        // Fetch page_size + 1 rows to detect hasNextPage/hasPreviousPage.
1000        let fetch_limit = page_size + 1;
1001
1002        // Parse optional `where` filter from variables.
1003        let user_where_clause = if query_def.auto_params.has_where {
1004            vars.and_then(|v| v.get("where"))
1005                .map(WhereClause::from_graphql_json)
1006                .transpose()?
1007        } else {
1008            None
1009        };
1010
1011        // Compose final WHERE: security (RLS + inject) AND user-supplied WHERE.
1012        // Security conditions always come first so they cannot be bypassed.
1013        let combined_where = match (security_where, user_where_clause) {
1014            (None, None) => None,
1015            (Some(sec), None) => Some(sec),
1016            (None, Some(user)) => Some(user),
1017            (Some(sec), Some(user)) => Some(WhereClause::And(vec![sec, user])),
1018        };
1019
1020        // Parse optional `orderBy` from variables.
1021        let order_by = if query_def.auto_params.has_order_by {
1022            vars.and_then(|v| v.get("orderBy"))
1023                .map(OrderByClause::from_graphql_json)
1024                .transpose()?
1025        } else {
1026            None
1027        };
1028
1029        // Detect whether the client selected `totalCount` inside the connection.
1030        // Named fragment spreads are already expanded by the matcher's FragmentResolver.
1031        // Inline fragments (`... on UserConnection { totalCount }`) remain as FieldSelection
1032        // entries with a name starting with "..." — we recurse one level into those.
1033        let include_total_count = query_match
1034            .selections
1035            .iter()
1036            .find(|sel| sel.name == query_def.name)
1037            .is_some_and(|connection_field| {
1038                selections_contain_field(&connection_field.nested_fields, "totalCount")
1039            });
1040
1041        // Capture before the move into execute_relay_page.
1042        let had_after = after_pk.is_some();
1043        let had_before = before_pk.is_some();
1044
1045        let result = relay
1046            .execute_relay_page(
1047                sql_source,
1048                cursor_column,
1049                after_pk,
1050                before_pk,
1051                fetch_limit,
1052                forward,
1053                combined_where.as_ref(),
1054                order_by.as_deref(),
1055                include_total_count,
1056            )
1057            .await?;
1058
1059        // Detect whether there are more pages.
1060        let has_extra = result.rows.len() > page_size as usize;
1061        let rows: Vec<_> = result.rows.into_iter().take(page_size as usize).collect();
1062
1063        let (has_next_page, has_previous_page) = if forward {
1064            (has_extra, had_after)
1065        } else {
1066            (had_before, has_extra)
1067        };
1068
1069        // Build edges: each edge has { cursor, node }.
1070        let mut edges = Vec::with_capacity(rows.len());
1071        let mut start_cursor_str: Option<String> = None;
1072        let mut end_cursor_str: Option<String> = None;
1073
1074        for (i, row) in rows.iter().enumerate() {
1075            let data = &row.data;
1076
1077            let col_val = data.as_object().and_then(|obj| obj.get(cursor_column));
1078
1079            let cursor_str = match query_def.relay_cursor_type {
1080                CursorType::Int64 => col_val
1081                    .and_then(|v| v.as_i64())
1082                    .map(encode_edge_cursor)
1083                    .ok_or_else(|| FraiseQLError::Validation {
1084                        message: format!(
1085                            "Relay query '{}': cursor column '{}' not found or not an integer in \
1086                             result JSONB. Ensure the view exposes this column inside the `data` object.",
1087                            query_def.name, cursor_column
1088                        ),
1089                        path: None,
1090                    })?,
1091                CursorType::Uuid => col_val
1092                    .and_then(|v| v.as_str())
1093                    .map(crate::runtime::relay::encode_uuid_cursor)
1094                    .ok_or_else(|| FraiseQLError::Validation {
1095                        message: format!(
1096                            "Relay query '{}': cursor column '{}' not found or not a string in \
1097                             result JSONB. Ensure the view exposes this column inside the `data` object.",
1098                            query_def.name, cursor_column
1099                        ),
1100                        path: None,
1101                    })?,
1102            };
1103
1104            if i == 0 {
1105                start_cursor_str = Some(cursor_str.clone());
1106            }
1107            end_cursor_str = Some(cursor_str.clone());
1108
1109            edges.push(serde_json::json!({
1110                "cursor": cursor_str,
1111                "node": data,
1112            }));
1113        }
1114
1115        let page_info = serde_json::json!({
1116            "hasNextPage": has_next_page,
1117            "hasPreviousPage": has_previous_page,
1118            "startCursor": start_cursor_str,
1119            "endCursor": end_cursor_str,
1120        });
1121
1122        let mut connection = serde_json::json!({
1123            "edges": edges,
1124            "pageInfo": page_info,
1125        });
1126
1127        // Include totalCount when the client requested it and the adapter provided it.
1128        if include_total_count {
1129            if let Some(count) = result.total_count {
1130                connection["totalCount"] = serde_json::json!(count);
1131            } else {
1132                connection["totalCount"] = serde_json::Value::Null;
1133            }
1134        }
1135
1136        let response = ResultProjector::wrap_in_data_envelope(connection, &query_def.name);
1137        Ok(serde_json::to_string(&response)?)
1138    }
1139
1140    /// Execute a Relay global `node(id: ID!)` query.
1141    ///
1142    /// Decodes the opaque node ID (`base64("TypeName:uuid")`), locates the
1143    /// appropriate SQL view by searching the compiled schema for a query that
1144    /// returns that type, and fetches the matching row.
1145    ///
1146    /// Returns `{ "data": { "node": <object> } }` on success, or
1147    /// `{ "data": { "node": null } }` when the object is not found.
1148    ///
1149    /// # Errors
1150    ///
1151    /// Returns `FraiseQLError::Validation` when:
1152    /// - The `id` argument is missing or malformed
1153    /// - No SQL view is registered for the requested type
1154    pub(super) async fn execute_node_query(
1155        &self,
1156        query: &str,
1157        variables: Option<&serde_json::Value>,
1158    ) -> Result<String> {
1159        use crate::{
1160            db::{WhereClause, where_clause::WhereOperator},
1161            runtime::relay::decode_node_id,
1162        };
1163
1164        // 1. Extract the raw opaque ID. Priority: $variables.id > inline literal in query text.
1165        let raw_id: String = if let Some(id_val) = variables
1166            .and_then(|v| v.as_object())
1167            .and_then(|obj| obj.get("id"))
1168            .and_then(|v| v.as_str())
1169        {
1170            id_val.to_string()
1171        } else {
1172            // Fall back to extracting inline literal, e.g. node(id: "NDI=")
1173            Self::extract_inline_node_id(query).ok_or_else(|| FraiseQLError::Validation {
1174                message: "node query: missing or unresolvable 'id' argument".to_string(),
1175                path:    Some("node.id".to_string()),
1176            })?
1177        };
1178
1179        // 2. Decode base64("TypeName:uuid") → (type_name, uuid).
1180        let (type_name, uuid) =
1181            decode_node_id(&raw_id).ok_or_else(|| FraiseQLError::Validation {
1182                message: format!("node query: invalid node ID '{raw_id}'"),
1183                path:    Some("node.id".to_string()),
1184            })?;
1185
1186        // 3. Find the SQL view for this type (O(1) index lookup built at startup).
1187        let sql_source: Arc<str> =
1188            self.node_type_index.get(&type_name).cloned().ok_or_else(|| {
1189                FraiseQLError::Validation {
1190                    message: format!("node query: no registered SQL view for type '{type_name}'"),
1191                    path:    Some("node.id".to_string()),
1192                }
1193            })?;
1194
1195        // 4. Build WHERE clause: data->>'id' = uuid
1196        let where_clause = WhereClause::Field {
1197            path:     vec!["id".to_string()],
1198            operator: WhereOperator::Eq,
1199            value:    serde_json::Value::String(uuid),
1200        };
1201
1202        // 5. Execute the query (limit 1).
1203        let rows = self
1204            .adapter
1205            .execute_where_query_arc(&sql_source, Some(&where_clause), Some(1), None, None)
1206            .await?;
1207
1208        // 6. Return the first matching row (or null).
1209        // When the Arc is exclusively owned (uncached path, refcount = 1) we can move the
1210        // data out without copying.  When the cache also holds a reference (refcount ≥ 2)
1211        // we clone the single `serde_json::Value` for this one-row lookup.
1212        let node_value = Arc::try_unwrap(rows)
1213            .map_or_else(
1214                |arc| arc.first().map_or(serde_json::Value::Null, |row| row.data.clone()),
1215                |v| v.into_iter().next().map_or(serde_json::Value::Null, |row| row.data),
1216            );
1217
1218        let response = ResultProjector::wrap_in_data_envelope(node_value, "node");
1219        Ok(serde_json::to_string(&response)?)
1220    }
1221}
1222
1223/// Estimate the payload reduction percentage from projecting N fields.
1224///
1225/// Uses a simple heuristic: each projected field saves proportional space
1226/// relative to a baseline of 20 typical JSONB fields per row. Clamped to
1227/// [10, 90] so the hint is never misleadingly extreme.
1228fn compute_projection_reduction(projected_field_count: usize) -> u32 {
1229    // Baseline: assume a typical type has 20 fields.
1230    const BASELINE_FIELD_COUNT: usize = 20;
1231    let requested = projected_field_count.min(BASELINE_FIELD_COUNT);
1232    let saved = BASELINE_FIELD_COUNT.saturating_sub(requested);
1233    // saved / BASELINE * 100, clamped to [10, 90]
1234    #[allow(clippy::cast_possible_truncation)] // Reason: result is in 0..=100, fits u32
1235    let percent = ((saved * 100) / BASELINE_FIELD_COUNT) as u32;
1236    percent.clamp(10, 90)
1237}
1238
1239/// Return `true` if `field_name` appears in `selections`, including inside inline
1240/// fragment entries (`FieldSelection` whose name starts with `"..."`).
1241///
1242/// Named fragment spreads are already flattened by [`FragmentResolver`] before this
1243/// is called, so we only need to recurse one level into inline fragments.
1244fn selections_contain_field(
1245    selections: &[crate::graphql::FieldSelection],
1246    field_name: &str,
1247) -> bool {
1248    for sel in selections {
1249        if sel.name == field_name {
1250            return true;
1251        }
1252        // Inline fragment: name starts with "..." (e.g. "...on UserConnection")
1253        if sel.name.starts_with("...") && selections_contain_field(&sel.nested_fields, field_name) {
1254            return true;
1255        }
1256    }
1257    false
1258}
1259
1260/// Auto-wired argument names that are handled by the `auto_params` system.
1261/// These are never treated as explicit WHERE filters.
1262const AUTO_PARAM_NAMES: &[&str] = &["where", "limit", "offset", "orderBy", "first", "last", "after", "before"];
1263
1264/// Convert PostgreSQL `information_schema.data_type` to a safe SQL cast suffix.
1265///
1266/// Returns an empty string for types that need no cast (e.g. `text`, `varchar`).
1267/// Normalise a database type name for use as the `pg_cast` hint in
1268/// `WhereClause::NativeField`.
1269///
1270/// The returned string is the **canonical PostgreSQL type name** (e.g. `"uuid"`,
1271/// `"int4"`, `"timestamp"`).  It is passed to `SqlDialect::cast_native_param`
1272/// which translates it into the dialect-appropriate cast expression:
1273/// - PostgreSQL: `$1::text::uuid`  (two-step to avoid binary wire-format mismatch)
1274/// - MySQL:      `CAST(? AS CHAR)`
1275/// - SQLite:     `CAST(? AS TEXT)`
1276/// - SQL Server: `CAST(@p1 AS UNIQUEIDENTIFIER)`
1277///
1278/// Returns `""` for text-like types that need no cast.
1279fn pg_type_to_cast(data_type: &str) -> &'static str {
1280    match data_type.to_lowercase().as_str() {
1281        "uuid" => "uuid",
1282        "integer" | "int" | "int4" => "int4",
1283        "bigint" | "int8" => "int8",
1284        "smallint" | "int2" => "int2",
1285        "boolean" | "bool" => "bool",
1286        "numeric" | "decimal" => "numeric",
1287        "double precision" | "float8" => "float8",
1288        "real" | "float4" => "float4",
1289        "timestamp without time zone" | "timestamp" => "timestamp",
1290        "timestamp with time zone" | "timestamptz" => "timestamptz",
1291        "date" => "date",
1292        "time without time zone" | "time" => "time",
1293        // text, varchar, char(n), etc. — no cast needed.
1294        _ => "",
1295    }
1296}
1297
1298/// Convert explicit query arguments (e.g. `id`, `slug`, `email`) into
1299/// WHERE equality conditions and AND them onto `existing`.
1300///
1301/// Arguments whose names match auto-wired parameters (`where`, `limit`,
1302/// `offset`, `orderBy`, `first`, `last`, `after`, `before`) are skipped —
1303/// they are handled separately by the auto-params system.
1304///
1305/// When an argument has a matching entry in `native_columns`, a
1306/// `WhereClause::NativeField` is emitted (enabling B-tree index lookup via
1307/// `WHERE col = $N::type`).  Otherwise a `WhereClause::Field` is emitted
1308/// (JSONB extraction: `WHERE data->>'col' = $N`).
1309fn combine_explicit_arg_where(
1310    existing: Option<WhereClause>,
1311    defined_args: &[crate::schema::ArgumentDefinition],
1312    provided_args: &std::collections::HashMap<String, serde_json::Value>,
1313    native_columns: &std::collections::HashMap<String, String>,
1314) -> Option<WhereClause> {
1315    let explicit_conditions: Vec<WhereClause> = defined_args
1316        .iter()
1317        .filter(|arg| !AUTO_PARAM_NAMES.contains(&arg.name.as_str()))
1318        .filter_map(|arg| {
1319            provided_args.get(&arg.name).map(|value| {
1320                if let Some(pg_type) = native_columns.get(&arg.name) {
1321                    WhereClause::NativeField {
1322                        column:   arg.name.clone(),
1323                        pg_cast:  pg_type_to_cast(pg_type).to_string(),
1324                        operator: WhereOperator::Eq,
1325                        value:    value.clone(),
1326                    }
1327                } else {
1328                    WhereClause::Field {
1329                        path:     vec![arg.name.clone()],
1330                        operator: WhereOperator::Eq,
1331                        value:    value.clone(),
1332                    }
1333                }
1334            })
1335        })
1336        .collect();
1337
1338    if explicit_conditions.is_empty() {
1339        return existing;
1340    }
1341
1342    let mut all_conditions = Vec::new();
1343    if let Some(prev) = existing {
1344        all_conditions.push(prev);
1345    }
1346    all_conditions.extend(explicit_conditions);
1347
1348    match all_conditions.len() {
1349        1 => Some(all_conditions.remove(0)),
1350        _ => Some(WhereClause::And(all_conditions)),
1351    }
1352}
1353
1354#[cfg(test)]
1355mod tests {
1356    use super::*;
1357    use crate::graphql::FieldSelection;
1358
1359    // -------------------------------------------------------------------------
1360    // Helpers
1361    // -------------------------------------------------------------------------
1362
1363    fn leaf(name: &str) -> FieldSelection {
1364        FieldSelection {
1365            name:          name.to_string(),
1366            alias:         None,
1367            arguments:     vec![],
1368            nested_fields: vec![],
1369            directives:    vec![],
1370        }
1371    }
1372
1373    fn fragment(name: &str, nested: Vec<FieldSelection>) -> FieldSelection {
1374        FieldSelection {
1375            name:          name.to_string(),
1376            alias:         None,
1377            arguments:     vec![],
1378            nested_fields: nested,
1379            directives:    vec![],
1380        }
1381    }
1382
1383    // =========================================================================
1384    // compute_projection_reduction
1385    // =========================================================================
1386
1387    #[test]
1388    fn projection_reduction_zero_fields_is_clamped_to_90() {
1389        // 0 fields requested → saved = 20 → 100% → clamped to 90
1390        assert_eq!(compute_projection_reduction(0), 90);
1391    }
1392
1393    #[test]
1394    fn projection_reduction_all_fields_is_clamped_to_10() {
1395        // 20 fields (= baseline) → saved = 0 → 0% → clamped to 10
1396        assert_eq!(compute_projection_reduction(20), 10);
1397    }
1398
1399    #[test]
1400    fn projection_reduction_above_baseline_clamps_to_10() {
1401        // 50 fields > 20 baseline → same as 20 → clamped to 10
1402        assert_eq!(compute_projection_reduction(50), 10);
1403    }
1404
1405    #[test]
1406    fn projection_reduction_10_fields_is_50_percent() {
1407        // 10 requested → saved = 10 → 10/20 * 100 = 50 → within [10, 90]
1408        assert_eq!(compute_projection_reduction(10), 50);
1409    }
1410
1411    #[test]
1412    fn projection_reduction_1_field_is_high() {
1413        // 1 requested → saved = 19 → 95% → clamped to 90
1414        assert_eq!(compute_projection_reduction(1), 90);
1415    }
1416
1417    #[test]
1418    fn projection_reduction_result_always_in_clamp_range() {
1419        for n in 0_usize..=30 {
1420            let r = compute_projection_reduction(n);
1421            assert!((10..=90).contains(&r), "out of [10,90] for n={n}: got {r}");
1422        }
1423    }
1424
1425    // =========================================================================
1426    // selections_contain_field
1427    // =========================================================================
1428
1429    #[test]
1430    fn empty_selections_returns_false() {
1431        assert!(!selections_contain_field(&[], "totalCount"));
1432    }
1433
1434    #[test]
1435    fn direct_match_returns_true() {
1436        let sels = vec![leaf("edges"), leaf("totalCount"), leaf("pageInfo")];
1437        assert!(selections_contain_field(&sels, "totalCount"));
1438    }
1439
1440    #[test]
1441    fn absent_field_returns_false() {
1442        let sels = vec![leaf("edges"), leaf("pageInfo")];
1443        assert!(!selections_contain_field(&sels, "totalCount"));
1444    }
1445
1446    #[test]
1447    fn inline_fragment_nested_match_returns_true() {
1448        // "...on UserConnection" wrapping totalCount
1449        let inline = fragment("...on UserConnection", vec![leaf("totalCount"), leaf("edges")]);
1450        let sels = vec![inline];
1451        assert!(selections_contain_field(&sels, "totalCount"));
1452    }
1453
1454    #[test]
1455    fn inline_fragment_does_not_spuriously_match_fragment_name() {
1456        // The fragment entry (name "...on Foo") only matches a field named exactly "...on Foo"
1457        // when searched directly; it should NOT match an unrelated field name.
1458        let inline = fragment("...on Foo", vec![leaf("id")]);
1459        let sels = vec![inline];
1460        assert!(!selections_contain_field(&sels, "totalCount"));
1461        // "id" is nested inside the fragment and should be found via recursion
1462        assert!(selections_contain_field(&sels, "id"));
1463    }
1464
1465    #[test]
1466    fn field_not_in_fragment_returns_false() {
1467        let inline = fragment("...on UserConnection", vec![leaf("edges"), leaf("pageInfo")]);
1468        let sels = vec![inline];
1469        assert!(!selections_contain_field(&sels, "totalCount"));
1470    }
1471
1472    #[test]
1473    fn non_fragment_nested_field_not_searched() {
1474        // Only entries whose name starts with "..." trigger recursion.
1475        // A plain field's nested_fields should NOT be recursed into.
1476        let nested_count = fragment("edges", vec![leaf("totalCount")]);
1477        let sels = vec![nested_count];
1478        // "edges" doesn't start with "..." — nested fields not searched
1479        assert!(!selections_contain_field(&sels, "totalCount"));
1480    }
1481
1482    #[test]
1483    fn multiple_fragments_any_can_match() {
1484        let frag1 = fragment("...on TypeA", vec![leaf("id")]);
1485        let frag2 = fragment("...on TypeB", vec![leaf("totalCount")]);
1486        let sels = vec![frag1, frag2];
1487        assert!(selections_contain_field(&sels, "totalCount"));
1488        assert!(selections_contain_field(&sels, "id"));
1489        assert!(!selections_contain_field(&sels, "name"));
1490    }
1491
1492    #[test]
1493    fn mixed_direct_and_fragment_selections() {
1494        let inline = fragment("...on Connection", vec![leaf("pageInfo")]);
1495        let sels = vec![leaf("edges"), inline, leaf("metadata")];
1496        assert!(selections_contain_field(&sels, "edges"));
1497        assert!(selections_contain_field(&sels, "pageInfo"));
1498        assert!(selections_contain_field(&sels, "metadata"));
1499        assert!(!selections_contain_field(&sels, "cursor"));
1500    }
1501
1502    // =========================================================================
1503    // combine_explicit_arg_where
1504    // =========================================================================
1505
1506    use crate::schema::{ArgumentDefinition, FieldType};
1507
1508    fn make_arg(name: &str) -> ArgumentDefinition {
1509        ArgumentDefinition::new(name, FieldType::Id)
1510    }
1511
1512    #[test]
1513    fn no_explicit_args_returns_existing() {
1514        let existing = Some(WhereClause::Field {
1515            path:     vec!["rls".into()],
1516            operator: WhereOperator::Eq,
1517            value:    serde_json::json!("x"),
1518        });
1519        let result = combine_explicit_arg_where(existing.clone(), &[], &std::collections::HashMap::new(), &std::collections::HashMap::new());
1520        assert_eq!(result, existing);
1521    }
1522
1523    #[test]
1524    fn explicit_id_arg_produces_where_clause() {
1525        let args = vec![make_arg("id")];
1526        let mut provided = std::collections::HashMap::new();
1527        provided.insert("id".into(), serde_json::json!("uuid-123"));
1528
1529        let result = combine_explicit_arg_where(None, &args, &provided, &std::collections::HashMap::new());
1530        assert!(result.is_some(), "explicit id arg should produce a WHERE clause");
1531        match result.expect("just asserted Some") {
1532            WhereClause::Field { path, operator, value } => {
1533                assert_eq!(path, vec!["id".to_string()]);
1534                assert_eq!(operator, WhereOperator::Eq);
1535                assert_eq!(value, serde_json::json!("uuid-123"));
1536            },
1537            other => panic!("expected Field, got {other:?}"),
1538        }
1539    }
1540
1541    #[test]
1542    fn auto_param_names_are_skipped() {
1543        let args = vec![
1544            make_arg("where"),
1545            make_arg("limit"),
1546            make_arg("offset"),
1547            make_arg("orderBy"),
1548            make_arg("first"),
1549            make_arg("last"),
1550            make_arg("after"),
1551            make_arg("before"),
1552            make_arg("id"),
1553        ];
1554        let mut provided = std::collections::HashMap::new();
1555        for name in &["where", "limit", "offset", "orderBy", "first", "last", "after", "before", "id"] {
1556            provided.insert((*name).to_string(), serde_json::json!("value"));
1557        }
1558
1559        let result = combine_explicit_arg_where(None, &args, &provided, &std::collections::HashMap::new());
1560        // Only "id" should produce a WHERE — all auto-param names are skipped
1561        match result.expect("id arg should produce WHERE") {
1562            WhereClause::Field { path, .. } => {
1563                assert_eq!(path, vec!["id".to_string()]);
1564            },
1565            other => panic!("expected single Field for 'id', got {other:?}"),
1566        }
1567    }
1568
1569    #[test]
1570    fn explicit_args_combined_with_existing_where() {
1571        let existing = WhereClause::Field {
1572            path:     vec!["rls_tenant".into()],
1573            operator: WhereOperator::Eq,
1574            value:    serde_json::json!("tenant-1"),
1575        };
1576        let args = vec![make_arg("id")];
1577        let mut provided = std::collections::HashMap::new();
1578        provided.insert("id".into(), serde_json::json!("uuid-456"));
1579
1580        let result = combine_explicit_arg_where(Some(existing), &args, &provided, &std::collections::HashMap::new());
1581        match result.expect("should produce combined WHERE") {
1582            WhereClause::And(conditions) => {
1583                assert_eq!(conditions.len(), 2, "should AND existing + explicit");
1584            },
1585            other => panic!("expected And, got {other:?}"),
1586        }
1587    }
1588
1589    #[test]
1590    fn unprovided_explicit_arg_is_ignored() {
1591        let args = vec![make_arg("id"), make_arg("slug")];
1592        let mut provided = std::collections::HashMap::new();
1593        // Only provide "id", not "slug"
1594        provided.insert("id".into(), serde_json::json!("uuid-789"));
1595
1596        let result = combine_explicit_arg_where(None, &args, &provided, &std::collections::HashMap::new());
1597        match result.expect("id arg should produce WHERE") {
1598            WhereClause::Field { path, .. } => {
1599                assert_eq!(path, vec!["id".to_string()]);
1600            },
1601            other => panic!("expected single Field for 'id', got {other:?}"),
1602        }
1603    }
1604
1605    // =========================================================================
1606    // pg_type_to_cast — returns canonical type names passed to SqlDialect::cast_native_param
1607    // =========================================================================
1608
1609    #[test]
1610    fn uuid_normalises_to_canonical_type_name() {
1611        assert_eq!(pg_type_to_cast("uuid"), "uuid");
1612        assert_eq!(pg_type_to_cast("UUID"), "uuid");
1613    }
1614
1615    #[test]
1616    fn integer_types_normalise_to_canonical_names() {
1617        assert_eq!(pg_type_to_cast("integer"), "int4");
1618        assert_eq!(pg_type_to_cast("int4"), "int4");
1619        assert_eq!(pg_type_to_cast("bigint"), "int8");
1620        assert_eq!(pg_type_to_cast("int8"), "int8");
1621        assert_eq!(pg_type_to_cast("smallint"), "int2");
1622        assert_eq!(pg_type_to_cast("int2"), "int2");
1623    }
1624
1625    #[test]
1626    fn float_and_numeric_types_normalise_to_canonical_names() {
1627        assert_eq!(pg_type_to_cast("numeric"), "numeric");
1628        assert_eq!(pg_type_to_cast("decimal"), "numeric");
1629        assert_eq!(pg_type_to_cast("double precision"), "float8");
1630        assert_eq!(pg_type_to_cast("float8"), "float8");
1631        assert_eq!(pg_type_to_cast("real"), "float4");
1632        assert_eq!(pg_type_to_cast("float4"), "float4");
1633    }
1634
1635    #[test]
1636    fn date_and_time_types_normalise_to_canonical_names() {
1637        assert_eq!(pg_type_to_cast("timestamp"), "timestamp");
1638        assert_eq!(pg_type_to_cast("timestamp without time zone"), "timestamp");
1639        assert_eq!(pg_type_to_cast("timestamptz"), "timestamptz");
1640        assert_eq!(pg_type_to_cast("timestamp with time zone"), "timestamptz");
1641        assert_eq!(pg_type_to_cast("date"), "date");
1642        assert_eq!(pg_type_to_cast("time"), "time");
1643        assert_eq!(pg_type_to_cast("time without time zone"), "time");
1644    }
1645
1646    #[test]
1647    fn bool_normalises_to_canonical_name() {
1648        assert_eq!(pg_type_to_cast("boolean"), "bool");
1649        assert_eq!(pg_type_to_cast("bool"), "bool");
1650    }
1651
1652    #[test]
1653    fn text_types_produce_empty_hint_meaning_no_cast() {
1654        assert_eq!(pg_type_to_cast("text"), "");
1655        assert_eq!(pg_type_to_cast("varchar"), "");
1656        assert_eq!(pg_type_to_cast("unknown_type"), "");
1657    }
1658}