Skip to main content

fraiseql_core/runtime/
executor.rs

1//! Query executor - main runtime execution engine.
2//!
3//! # Async Cancellation Strategy
4//!
5//! Queries are protected from long-running operations through the `query_timeout_ms`
6//! configuration in `RuntimeConfig`. When a query exceeds this timeout, the operation
7//! is cancelled via `tokio::time::timeout()`, which aborts the future.
8//!
9//! - **Default timeout**: 30 seconds
10//! - **No timeout**: Set `query_timeout_ms` to 0
11//! - **Custom timeout**: Set `query_timeout_ms` to desired milliseconds
12//!
13//! For graceful shutdown of long-running tasks, callers can wrap `execute()` calls
14//! with their own `tokio::time::timeout()` or use `tokio_util::task::AbortOnDrop`
15//! for task lifecycle management.
16
17use std::{sync::Arc, time::Duration};
18
19use super::{
20    ExecutionContext, JsonbStrategy, QueryMatcher, QueryPlanner, ResultProjector, RuntimeConfig,
21    filter_fields,
22    mutation_result::{MutationOutcome, parse_mutation_row, populate_error_fields},
23};
24#[cfg(test)]
25use crate::db::types::{DatabaseType, PoolMetrics};
26use crate::{
27    db::{WhereClause, projection_generator::PostgresProjectionGenerator, traits::DatabaseAdapter},
28    error::{FraiseQLError, Result},
29    graphql::parse_query,
30    schema::{CompiledSchema, IntrospectionResponses, SecurityConfig, SqlProjectionHint},
31    security::{FieldAccessError, SecurityContext},
32};
33
34/// Query type classification for routing.
35#[derive(Debug, Clone, PartialEq)]
36enum QueryType {
37    /// Regular GraphQL query (non-analytics).
38    Regular,
39
40    /// Aggregate analytics query (ends with _aggregate).
41    /// Contains the full query name (e.g., "sales_aggregate").
42    Aggregate(String),
43
44    /// Window function query (ends with _window).
45    /// Contains the full query name (e.g., "sales_window").
46    Window(String),
47
48    /// Federation query (_service or _entities).
49    /// Contains the query name ("_service" or "_entities").
50    Federation(String),
51
52    /// Introspection query (`__schema`).
53    IntrospectionSchema,
54
55    /// Introspection query (`__type(name: "...")`).
56    /// Contains the requested type name.
57    IntrospectionType(String),
58
59    /// GraphQL mutation.
60    /// Contains the root field name (e.g., "createMachine").
61    Mutation(String),
62}
63
64/// Query executor - executes compiled GraphQL queries.
65///
66/// This is the main entry point for runtime query execution.
67/// It coordinates matching, planning, execution, and projection.
68///
69/// # Type Parameters
70///
71/// * `A` - The database adapter type (implements `DatabaseAdapter` trait)
72///
73/// # Ownership and Lifetimes
74///
75/// The executor holds owned references to schema and runtime data, with no borrowed pointers:
76/// - `schema`: Owned `CompiledSchema` (immutable after construction)
77/// - `adapter`: Shared via `Arc<A>` to allow multiple executors/tasks to use the same connection
78///   pool
79/// - `introspection`: Owned cached GraphQL schema responses
80/// - `config`: Owned runtime configuration
81///
82/// **No explicit lifetimes required** - all data is either owned or wrapped in `Arc`,
83/// so the executor can be stored in long-lived structures without lifetime annotations or
84/// borrow-checker issues.
85///
86/// # Concurrency
87///
88/// `Executor<A>` is `Send + Sync` when `A` is `Send + Sync`. It can be safely shared across
89/// threads and tasks without cloning:
90/// ```ignore
91/// let executor = Arc::new(Executor::new(schema, adapter, config));
92/// // Can be cloned into multiple tasks
93/// let exec_clone = executor.clone();
94/// tokio::spawn(async move {
95///     let result = exec_clone.execute(query, vars).await;
96/// });
97/// ```
98///
99/// # Query Timeout
100///
101/// Queries are protected by the `query_timeout_ms` configuration in `RuntimeConfig` (default: 30s).
102/// When a query exceeds this timeout, it returns `FraiseQLError::Timeout` without panicking.
103/// Set `query_timeout_ms` to 0 to disable timeout enforcement.
104pub struct Executor<A: DatabaseAdapter> {
105    /// Compiled schema with optimized SQL templates
106    schema: CompiledSchema,
107
108    /// Shared database adapter for query execution
109    /// Wrapped in Arc to allow multiple executors to use the same connection pool
110    adapter: Arc<A>,
111
112    /// Query matching engine (stateless)
113    matcher: QueryMatcher,
114
115    /// Query execution planner (stateless)
116    planner: QueryPlanner,
117
118    /// Runtime configuration (timeouts, complexity limits, etc.)
119    config: RuntimeConfig,
120
121    /// Pre-built introspection responses cached for `__schema` and `__type` queries
122    /// Avoids recomputing schema introspection on every request
123    introspection: IntrospectionResponses,
124}
125
126impl<A: DatabaseAdapter> Executor<A> {
127    /// Create new executor.
128    ///
129    /// # Arguments
130    ///
131    /// * `schema` - Compiled schema
132    /// * `adapter` - Database adapter
133    ///
134    /// # Example
135    ///
136    /// ```rust,ignore
137    /// let schema = CompiledSchema::from_json(schema_json)?;
138    /// let adapter = PostgresAdapter::new(connection_string).await?;
139    /// let executor = Executor::new(schema, Arc::new(adapter));
140    /// ```
141    #[must_use]
142    pub fn new(schema: CompiledSchema, adapter: Arc<A>) -> Self {
143        Self::with_config(schema, adapter, RuntimeConfig::default())
144    }
145
146    /// Create new executor with custom configuration.
147    ///
148    /// # Arguments
149    ///
150    /// * `schema` - Compiled schema
151    /// * `adapter` - Database adapter
152    /// * `config` - Runtime configuration
153    #[must_use]
154    pub fn with_config(schema: CompiledSchema, adapter: Arc<A>, config: RuntimeConfig) -> Self {
155        let matcher = QueryMatcher::new(schema.clone());
156        let planner = QueryPlanner::new(config.cache_query_plans);
157        // Build introspection responses at startup (zero-cost at runtime)
158        let introspection = IntrospectionResponses::build(&schema);
159
160        Self {
161            schema,
162            adapter,
163            matcher,
164            planner,
165            config,
166            introspection,
167        }
168    }
169
170    /// Execute a GraphQL query.
171    ///
172    /// # Arguments
173    ///
174    /// * `query` - GraphQL query string
175    /// * `variables` - Query variables (optional)
176    ///
177    /// # Returns
178    ///
179    /// GraphQL response as JSON string
180    ///
181    /// # Errors
182    ///
183    /// Returns error if:
184    /// - Query is malformed
185    /// - Query references undefined operations
186    /// - Database execution fails
187    /// - Result projection fails
188    ///
189    /// # Example
190    ///
191    /// ```rust,ignore
192    /// let query = r#"query { users { id name } }"#;
193    /// let result = executor.execute(query, None).await?;
194    /// println!("{}", result);
195    /// ```
196    pub async fn execute(
197        &self,
198        query: &str,
199        variables: Option<&serde_json::Value>,
200    ) -> Result<String> {
201        // Apply query timeout if configured
202        if self.config.query_timeout_ms > 0 {
203            let timeout_duration = Duration::from_millis(self.config.query_timeout_ms);
204            tokio::time::timeout(timeout_duration, self.execute_internal(query, variables))
205                .await
206                .map_err(|_| {
207                    // Truncate query if too long for error reporting
208                    let query_snippet = if query.len() > 100 {
209                        format!("{}...", &query[..100])
210                    } else {
211                        query.to_string()
212                    };
213                    FraiseQLError::Timeout {
214                        timeout_ms: self.config.query_timeout_ms,
215                        query:      Some(query_snippet),
216                    }
217                })?
218        } else {
219            self.execute_internal(query, variables).await
220        }
221    }
222
223    /// Internal execution logic (called by execute with timeout wrapper).
224    async fn execute_internal(
225        &self,
226        query: &str,
227        variables: Option<&serde_json::Value>,
228    ) -> Result<String> {
229        // 1. Classify query type
230        let query_type = self.classify_query(query)?;
231
232        // 2. Route to appropriate handler
233        match query_type {
234            QueryType::Regular => self.execute_regular_query(query, variables).await,
235            QueryType::Aggregate(query_name) => {
236                self.execute_aggregate_dispatch(&query_name, variables).await
237            },
238            QueryType::Window(query_name) => {
239                self.execute_window_dispatch(&query_name, variables).await
240            },
241            QueryType::Federation(query_name) => {
242                self.execute_federation_query(&query_name, query, variables).await
243            },
244            QueryType::IntrospectionSchema => {
245                // Return pre-built __schema response (zero-cost at runtime)
246                Ok(self.introspection.schema_response.clone())
247            },
248            QueryType::IntrospectionType(type_name) => {
249                // Return pre-built __type response (zero-cost at runtime)
250                Ok(self.introspection.get_type_response(&type_name))
251            },
252            QueryType::Mutation(mutation_name) => {
253                self.execute_mutation_query(&mutation_name, variables).await
254            },
255        }
256    }
257
258    /// Execute a GraphQL query with user context for field-level access control.
259    ///
260    /// This method validates that the user has permission to access all requested
261    /// fields before executing the query. If field filtering is enabled in the
262    /// `RuntimeConfig` and the user lacks required scopes, this returns an error.
263    ///
264    /// # Arguments
265    ///
266    /// * `query` - GraphQL query string
267    /// * `variables` - Query variables (optional)
268    /// * `user_scopes` - User's scopes from JWT token (pass empty slice if unauthenticated)
269    ///
270    /// # Returns
271    ///
272    /// GraphQL response as JSON string, or error if access denied
273    ///
274    /// # Example
275    ///
276    /// ```rust,ignore
277    /// let query = r#"query { users { id name salary } }"#;
278    /// let user_scopes = user.scopes.iter().map(|s| s.as_str()).collect::<Vec<_>>();
279    /// let result = executor.execute_with_scopes(query, None, &user_scopes).await?;
280    /// ```
281    pub async fn execute_with_scopes(
282        &self,
283        query: &str,
284        variables: Option<&serde_json::Value>,
285        user_scopes: &[String],
286    ) -> Result<String> {
287        // 1. Classify query type
288        let query_type = self.classify_query(query)?;
289
290        // 2. Validate field access if filter is configured
291        if let Some(ref filter) = self.config.field_filter {
292            // Only validate for regular queries (not introspection)
293            if matches!(query_type, QueryType::Regular) {
294                self.validate_field_access(query, variables, user_scopes, filter)?;
295            }
296        }
297
298        // 3. Route to appropriate handler (same as execute)
299        match query_type {
300            QueryType::Regular => self.execute_regular_query(query, variables).await,
301            QueryType::Aggregate(query_name) => {
302                self.execute_aggregate_dispatch(&query_name, variables).await
303            },
304            QueryType::Window(query_name) => {
305                self.execute_window_dispatch(&query_name, variables).await
306            },
307            QueryType::Federation(query_name) => {
308                self.execute_federation_query(&query_name, query, variables).await
309            },
310            QueryType::IntrospectionSchema => Ok(self.introspection.schema_response.clone()),
311            QueryType::IntrospectionType(type_name) => {
312                Ok(self.introspection.get_type_response(&type_name))
313            },
314            QueryType::Mutation(mutation_name) => {
315                self.execute_mutation_query(&mutation_name, variables).await
316            },
317        }
318    }
319
320    /// Validate that user has access to all requested fields.
321    fn validate_field_access(
322        &self,
323        query: &str,
324        variables: Option<&serde_json::Value>,
325        user_scopes: &[String],
326        filter: &crate::security::FieldFilter,
327    ) -> Result<()> {
328        // Parse query to get field selections
329        let query_match = self.matcher.match_query(query, variables)?;
330
331        // Get the return type name from the query definition
332        let type_name = &query_match.query_def.return_type;
333
334        // Validate each requested field
335        let field_refs: Vec<&str> = query_match.fields.iter().map(String::as_str).collect();
336        let errors = filter.validate_fields(type_name, &field_refs, user_scopes);
337
338        if errors.is_empty() {
339            Ok(())
340        } else {
341            // Return the first error (could aggregate all errors if desired)
342            let first_error = &errors[0];
343            Err(FraiseQLError::Authorization {
344                message:  first_error.message.clone(),
345                action:   Some("read".to_string()),
346                resource: Some(format!("{}.{}", first_error.type_name, first_error.field_name)),
347            })
348        }
349    }
350
351    /// Execute a GraphQL query with cancellation support via ExecutionContext.
352    ///
353    /// This method allows graceful cancellation of long-running queries through a
354    /// cancellation token. If the token is cancelled during execution, the query
355    /// returns a `FraiseQLError::Cancelled` error.
356    ///
357    /// # Arguments
358    ///
359    /// * `query` - GraphQL query string
360    /// * `variables` - Query variables (optional)
361    /// * `ctx` - ExecutionContext with cancellation token
362    ///
363    /// # Returns
364    ///
365    /// GraphQL response as JSON string, or error if cancelled or execution fails
366    ///
367    /// # Example
368    ///
369    /// ```rust,ignore
370    /// let ctx = ExecutionContext::new("user-query-123".to_string());
371    /// let cancel_token = ctx.cancellation_token().clone();
372    ///
373    /// // Spawn a task to cancel after 5 seconds
374    /// tokio::spawn(async move {
375    ///     tokio::time::sleep(Duration::from_secs(5)).await;
376    ///     cancel_token.cancel();
377    /// });
378    ///
379    /// let result = executor.execute_with_context(query, None, &ctx).await;
380    /// match result {
381    ///     Err(FraiseQLError::Cancelled { reason, .. }) => {
382    ///         eprintln!("Query cancelled: {}", reason);
383    ///     }
384    ///     Ok(response) => println!("{}", response),
385    ///     Err(e) => eprintln!("Error: {}", e),
386    /// }
387    /// ```
388    pub async fn execute_with_context(
389        &self,
390        query: &str,
391        variables: Option<&serde_json::Value>,
392        ctx: &ExecutionContext,
393    ) -> Result<String> {
394        // Check if already cancelled before starting
395        if ctx.is_cancelled() {
396            return Err(FraiseQLError::cancelled(
397                ctx.query_id().to_string(),
398                "Query cancelled before execution".to_string(),
399            ));
400        }
401
402        let token = ctx.cancellation_token().clone();
403
404        // Use tokio::select! to race between execution and cancellation
405        tokio::select! {
406            result = self.execute(query, variables) => {
407                result
408            }
409            () = token.cancelled() => {
410                Err(FraiseQLError::cancelled(
411                    ctx.query_id().to_string(),
412                    "Query cancelled during execution".to_string(),
413                ))
414            }
415        }
416    }
417
418    /// Execute a GraphQL query with row-level security (RLS) context.
419    ///
420    /// This method applies RLS filtering based on the user's SecurityContext
421    /// before executing the query. If an RLS policy is configured in RuntimeConfig,
422    /// it will be evaluated to determine what rows the user can access.
423    ///
424    /// # Arguments
425    ///
426    /// * `query` - GraphQL query string
427    /// * `variables` - Query variables (optional)
428    /// * `security_context` - User's security context (authentication + permissions)
429    ///
430    /// # Returns
431    ///
432    /// GraphQL response as JSON string, or error if access denied by RLS
433    ///
434    /// # Example
435    ///
436    /// ```rust,ignore
437    /// let query = r#"query { posts { id title } }"#;
438    /// let context = SecurityContext {
439    ///     user_id: "user1".to_string(),
440    ///     roles: vec!["user".to_string()],
441    ///     tenant_id: None,
442    ///     scopes: vec![],
443    ///     attributes: HashMap::new(),
444    ///     request_id: "req-1".to_string(),
445    ///     ip_address: None,
446    ///     authenticated_at: Utc::now(),
447    ///     expires_at: Utc::now() + Duration::hours(1),
448    ///     issuer: None,
449    ///     audience: None,
450    /// };
451    /// let result = executor.execute_with_security(query, None, &context).await?;
452    /// ```
453    pub async fn execute_with_security(
454        &self,
455        query: &str,
456        variables: Option<&serde_json::Value>,
457        security_context: &SecurityContext,
458    ) -> Result<String> {
459        // Apply query timeout if configured
460        if self.config.query_timeout_ms > 0 {
461            let timeout_duration = Duration::from_millis(self.config.query_timeout_ms);
462            tokio::time::timeout(
463                timeout_duration,
464                self.execute_with_security_internal(query, variables, security_context),
465            )
466            .await
467            .map_err(|_| {
468                let query_snippet = if query.len() > 100 {
469                    format!("{}...", &query[..100])
470                } else {
471                    query.to_string()
472                };
473                FraiseQLError::Timeout {
474                    timeout_ms: self.config.query_timeout_ms,
475                    query:      Some(query_snippet),
476                }
477            })?
478        } else {
479            self.execute_with_security_internal(query, variables, security_context).await
480        }
481    }
482
483    /// Internal execution logic with security context (called by execute_with_security with timeout
484    /// wrapper).
485    async fn execute_with_security_internal(
486        &self,
487        query: &str,
488        variables: Option<&serde_json::Value>,
489        security_context: &SecurityContext,
490    ) -> Result<String> {
491        // 1. Classify query type
492        let query_type = self.classify_query(query)?;
493
494        // 2. Route to appropriate handler (with RLS support for regular queries)
495        match query_type {
496            QueryType::Regular => {
497                self.execute_regular_query_with_security(query, variables, security_context)
498                    .await
499            },
500            // Other query types don't support RLS yet
501            QueryType::Aggregate(query_name) => {
502                self.execute_aggregate_dispatch(&query_name, variables).await
503            },
504            QueryType::Window(query_name) => {
505                self.execute_window_dispatch(&query_name, variables).await
506            },
507            QueryType::Federation(query_name) => {
508                self.execute_federation_query(&query_name, query, variables).await
509            },
510            QueryType::IntrospectionSchema => Ok(self.introspection.schema_response.clone()),
511            QueryType::IntrospectionType(type_name) => {
512                Ok(self.introspection.get_type_response(&type_name))
513            },
514            QueryType::Mutation(mutation_name) => {
515                self.execute_mutation_query(&mutation_name, variables).await
516            },
517        }
518    }
519
520    /// Check if a specific field can be accessed with given scopes.
521    ///
522    /// This is a convenience method for checking field access without executing a query.
523    ///
524    /// # Arguments
525    ///
526    /// * `type_name` - The GraphQL type name
527    /// * `field_name` - The field name
528    /// * `user_scopes` - User's scopes from JWT token
529    ///
530    /// # Returns
531    ///
532    /// `Ok(())` if access is allowed, `Err(FieldAccessError)` if denied
533    pub fn check_field_access(
534        &self,
535        type_name: &str,
536        field_name: &str,
537        user_scopes: &[String],
538    ) -> std::result::Result<(), FieldAccessError> {
539        if let Some(ref filter) = self.config.field_filter {
540            filter.can_access(type_name, field_name, user_scopes)
541        } else {
542            // No filter configured, allow all access
543            Ok(())
544        }
545    }
546
547    /// Apply field-level RBAC filtering to projection fields.
548    ///
549    /// Filters the projection fields based on the user's security context and field scope
550    /// requirements. Returns only fields that the user is authorized to access.
551    ///
552    /// # Arguments
553    ///
554    /// * `return_type` - The GraphQL return type name (e.g., "User", "Post")
555    /// * `projection_fields` - The originally requested field names
556    /// * `security_context` - The user's security context with roles
557    ///
558    /// # Returns
559    ///
560    /// Filtered list of accessible field names in the same order as requested
561    fn apply_field_rbac_filtering(
562        &self,
563        return_type: &str,
564        projection_fields: Vec<String>,
565        security_context: &SecurityContext,
566    ) -> Result<Vec<String>> {
567        // Try to extract security config from compiled schema
568        if let Some(ref security_json) = self.schema.security {
569            // Deserialize security config
570            let security_config: SecurityConfig = serde_json::from_value(security_json.clone())
571                .map_err(|_| FraiseQLError::Validation {
572                    message: "Invalid security configuration in compiled schema".to_string(),
573                    path:    Some("schema.security".to_string()),
574                })?;
575
576            // Find the type in the schema
577            if let Some(type_def) = self.schema.types.iter().find(|t| t.name == return_type) {
578                // Filter fields based on user roles and scope requirements
579                let accessible_fields =
580                    filter_fields(security_context, &security_config, &type_def.fields);
581
582                // Map back to field names, preserving order from projection_fields
583                let accessible_names: std::collections::HashSet<String> =
584                    accessible_fields.iter().map(|f| f.name.clone()).collect();
585
586                let filtered: Vec<String> = projection_fields
587                    .into_iter()
588                    .filter(|name| accessible_names.contains(name))
589                    .collect();
590
591                return Ok(filtered);
592            }
593        }
594
595        // If no security config or type not found, return all projection fields (no filtering)
596        Ok(projection_fields)
597    }
598
599    /// Execute a regular query with row-level security (RLS) filtering.
600    ///
601    /// This method:
602    /// 1. Validates the user's security context (token expiration, etc.)
603    /// 2. Evaluates RLS policies to determine what rows the user can access
604    /// 3. Composes RLS filters with user-provided WHERE clauses
605    /// 4. Passes the composed filter to the database adapter for SQL-level filtering
606    ///
607    /// RLS filtering happens at the database level, not in Rust, ensuring:
608    /// - High performance (database can optimize filters)
609    /// - Correct handling of pagination (LIMIT applied after RLS filtering)
610    /// - Type-safe composition via WhereClause enum
611    async fn execute_regular_query_with_security(
612        &self,
613        query: &str,
614        variables: Option<&serde_json::Value>,
615        security_context: &SecurityContext,
616    ) -> Result<String> {
617        // 1. Validate security context (check expiration, etc.)
618        if security_context.is_expired() {
619            return Err(FraiseQLError::Validation {
620                message: "Security token has expired".to_string(),
621                path:    Some("request.authorization".to_string()),
622            });
623        }
624
625        // 2. Match query to compiled template
626        let query_match = self.matcher.match_query(query, variables)?;
627
628        // 3. Create execution plan
629        let plan = self.planner.plan(&query_match)?;
630
631        // 4. Evaluate RLS policy and build WHERE clause filter
632        let rls_where_clause: Option<WhereClause> =
633            if let Some(ref rls_policy) = self.config.rls_policy {
634                // Evaluate RLS policy with user's security context
635                rls_policy.evaluate(security_context, &query_match.query_def.name)?
636            } else {
637                // No RLS policy configured, allow all access
638                None
639            };
640
641        // 5. Get SQL source from query definition
642        let sql_source =
643            query_match
644                .query_def
645                .sql_source
646                .as_ref()
647                .ok_or_else(|| FraiseQLError::Validation {
648                    message: "Query has no SQL source".to_string(),
649                    path:    None,
650                })?;
651
652        // 6. Generate SQL projection hint for requested fields (optimization)
653        // Strategy selection: Project (extract fields) vs Stream (return full JSONB)
654        let projection_hint = if !plan.projection_fields.is_empty()
655            && plan.jsonb_strategy == JsonbStrategy::Project
656        {
657            let generator = PostgresProjectionGenerator::new();
658            let projection_sql = generator
659                .generate_projection_sql(&plan.projection_fields)
660                .unwrap_or_else(|_| "data".to_string());
661
662            Some(SqlProjectionHint {
663                database:                    "postgresql".to_string(),
664                projection_template:         projection_sql,
665                estimated_reduction_percent: 50,
666            })
667        } else {
668            // Stream strategy: return full JSONB, no projection hint
669            None
670        };
671
672        // 7. Execute query with RLS WHERE clause filter
673        // The database adapter handles composition of RLS filter with user filters
674        // and generates the final SQL with both constraints applied
675        let results = self
676            .adapter
677            .execute_with_projection(
678                sql_source,
679                projection_hint.as_ref(),
680                rls_where_clause.as_ref(),
681                None,
682            )
683            .await?;
684
685        // 8. Apply field-level RBAC filtering
686        // Filter projection fields based on user roles and field scope requirements
687        let filtered_projection_fields = self.apply_field_rbac_filtering(
688            &query_match.query_def.return_type,
689            plan.projection_fields,
690            security_context,
691        )?;
692
693        // 9. Project results to accessible fields only
694        let projector = ResultProjector::new(filtered_projection_fields);
695        let projected = projector.project_results(&results, query_match.query_def.returns_list)?;
696
697        // 10. Wrap in GraphQL data envelope
698        let response =
699            ResultProjector::wrap_in_data_envelope(projected, &query_match.query_def.name);
700
701        // 11. Serialize to JSON string
702        Ok(serde_json::to_string(&response)?)
703    }
704
705    async fn execute_regular_query(
706        &self,
707        query: &str,
708        variables: Option<&serde_json::Value>,
709    ) -> Result<String> {
710        // 1. Match query to compiled template
711        let query_match = self.matcher.match_query(query, variables)?;
712
713        // 2. Create execution plan
714        let plan = self.planner.plan(&query_match)?;
715
716        // 3. Execute SQL query
717        let sql_source = query_match.query_def.sql_source.as_ref().ok_or_else(|| {
718            crate::error::FraiseQLError::Validation {
719                message: "Query has no SQL source".to_string(),
720                path:    None,
721            }
722        })?;
723
724        // 3a. Generate SQL projection hint for requested fields (optimization)
725        // Strategy selection: Project (extract fields) vs Stream (return full JSONB)
726        // This reduces payload by 40-55% by projecting only requested fields at the database level
727        let projection_hint = if !plan.projection_fields.is_empty()
728            && plan.jsonb_strategy == JsonbStrategy::Project
729        {
730            let generator = PostgresProjectionGenerator::new();
731            let projection_sql = generator
732                .generate_projection_sql(&plan.projection_fields)
733                .unwrap_or_else(|_| "data".to_string());
734
735            Some(SqlProjectionHint {
736                database:                    "postgresql".to_string(),
737                projection_template:         projection_sql,
738                estimated_reduction_percent: 50,
739            })
740        } else {
741            // Stream strategy: return full JSONB, no projection hint
742            None
743        };
744
745        let results = self
746            .adapter
747            .execute_with_projection(sql_source, projection_hint.as_ref(), None, None)
748            .await?;
749
750        // 4. Project results
751        let projector = ResultProjector::new(plan.projection_fields);
752        let projected = projector.project_results(&results, query_match.query_def.returns_list)?;
753
754        // 5. Wrap in GraphQL data envelope
755        let response =
756            ResultProjector::wrap_in_data_envelope(projected, &query_match.query_def.name);
757
758        // 6. Serialize to JSON string
759        Ok(serde_json::to_string(&response)?)
760    }
761
762    /// Execute a GraphQL mutation by calling the configured PostgreSQL function.
763    ///
764    /// Looks up the `MutationDefinition`, calls `execute_function_call` on the adapter,
765    /// parses the returned `mutation_response` row, and builds the GraphQL response
766    /// (either the success entity or a populated error-type object).
767    async fn execute_mutation_query(
768        &self,
769        mutation_name: &str,
770        variables: Option<&serde_json::Value>,
771    ) -> Result<String> {
772        // 1. Locate the mutation definition
773        let mutation_def =
774            self.schema.find_mutation(mutation_name).ok_or_else(|| {
775                FraiseQLError::Validation {
776                    message: format!("Unknown mutation: {mutation_name}"),
777                    path:    None,
778                }
779            })?;
780
781        // 2. Require a sql_source (PostgreSQL function name)
782        let sql_source = mutation_def.sql_source.as_deref().ok_or_else(|| {
783            FraiseQLError::Validation {
784                message: format!("Mutation '{mutation_name}' has no sql_source configured"),
785                path:    None,
786            }
787        })?;
788
789        // 3. Build positional args Vec from variables in ArgumentDefinition order
790        let vars_obj = variables.and_then(|v| v.as_object());
791        let args: Vec<serde_json::Value> = mutation_def
792            .arguments
793            .iter()
794            .map(|arg| {
795                vars_obj
796                    .and_then(|obj| obj.get(&arg.name))
797                    .cloned()
798                    .unwrap_or(serde_json::Value::Null)
799            })
800            .collect();
801
802        // 4. Call the database function
803        let rows = self.adapter.execute_function_call(sql_source, &args).await?;
804
805        // 5. Expect at least one row
806        let row = rows.into_iter().next().ok_or_else(|| FraiseQLError::Validation {
807            message: format!(
808                "Mutation '{mutation_name}': function returned no rows"
809            ),
810            path: None,
811        })?;
812
813        // 6. Parse the mutation_response row
814        let outcome = parse_mutation_row(&row)?;
815
816        // Clone name and return_type to avoid borrow issues after schema lookups
817        let mutation_return_type = mutation_def.return_type.clone();
818        let mutation_name_owned = mutation_name.to_string();
819
820        let result_json = match outcome {
821            MutationOutcome::Success { entity, entity_type, .. } => {
822                // Determine the GraphQL __typename
823                let typename = entity_type
824                    .or_else(|| {
825                        // Fall back to first non-error union member
826                        self.schema
827                            .find_union(&mutation_return_type)
828                            .and_then(|u| {
829                                u.member_types.iter().find(|t| {
830                                    self.schema
831                                        .find_type(t)
832                                        .map(|td| !td.is_error)
833                                        .unwrap_or(true)
834                                })
835                            })
836                            .cloned()
837                    })
838                    .unwrap_or_else(|| mutation_return_type.clone());
839
840                let mut obj = entity
841                    .as_object()
842                    .cloned()
843                    .unwrap_or_default();
844                obj.insert(
845                    "__typename".to_string(),
846                    serde_json::Value::String(typename),
847                );
848                serde_json::Value::Object(obj)
849            },
850            MutationOutcome::Error { status, metadata, .. } => {
851                // Find the matching error type from the return union
852                let error_type = self
853                    .schema
854                    .find_union(&mutation_return_type)
855                    .and_then(|u| {
856                        u.member_types.iter().find_map(|t| {
857                            let td = self.schema.find_type(t)?;
858                            if td.is_error { Some(td) } else { None }
859                        })
860                    });
861
862                match error_type {
863                    Some(td) => {
864                        let mut fields =
865                            populate_error_fields(&td.fields, &metadata);
866                        fields.insert(
867                            "__typename".to_string(),
868                            serde_json::Value::String(td.name.clone()),
869                        );
870                        // Include status so the client can act on it
871                        fields.insert(
872                            "status".to_string(),
873                            serde_json::Value::String(status),
874                        );
875                        serde_json::Value::Object(fields)
876                    },
877                    None => {
878                        // No error type defined: surface the status as a plain object
879                        serde_json::json!({ "__typename": mutation_return_type, "status": status })
880                    },
881                }
882            },
883        };
884
885        let response = ResultProjector::wrap_in_data_envelope(result_json, &mutation_name_owned);
886        Ok(serde_json::to_string(&response)?)
887    }
888
889    /// Classify query type based on operation name.
890    fn classify_query(&self, query: &str) -> Result<QueryType> {
891        // Check for introspection queries first (highest priority)
892        if let Some(introspection_type) = self.detect_introspection(query) {
893            return Ok(introspection_type);
894        }
895
896        // Check for federation queries (higher priority than regular queries)
897        if let Some(federation_type) = self.detect_federation(query) {
898            return Ok(federation_type);
899        }
900
901        // Parse the query to extract the root field name and operation type
902        let parsed = parse_query(query).map_err(|e| FraiseQLError::Parse {
903            message:  e.to_string(),
904            location: "query".to_string(),
905        })?;
906
907        let root_field = &parsed.root_field;
908
909        // Mutations are routed by operation type
910        if parsed.operation_type == "mutation" {
911            return Ok(QueryType::Mutation(root_field.clone()));
912        }
913
914        // Check if it's an aggregate query (ends with _aggregate)
915        if root_field.ends_with("_aggregate") {
916            return Ok(QueryType::Aggregate(root_field.clone()));
917        }
918
919        // Check if it's a window query (ends with _window)
920        if root_field.ends_with("_window") {
921            return Ok(QueryType::Window(root_field.clone()));
922        }
923
924        // Otherwise, it's a regular query
925        Ok(QueryType::Regular)
926    }
927
928    /// Detect if a query is an introspection query.
929    ///
930    /// Returns `Some(QueryType)` for introspection queries, `None` otherwise.
931    fn detect_introspection(&self, query: &str) -> Option<QueryType> {
932        let query_trimmed = query.trim();
933
934        // Check for __schema query
935        if query_trimmed.contains("__schema") {
936            return Some(QueryType::IntrospectionSchema);
937        }
938
939        // Check for __type(name: "...") query
940        if query_trimmed.contains("__type") {
941            // Extract the type name from __type(name: "TypeName")
942            if let Some(type_name) = self.extract_type_argument(query_trimmed) {
943                return Some(QueryType::IntrospectionType(type_name));
944            }
945            // If no type name found, return schema introspection as fallback
946            return Some(QueryType::IntrospectionSchema);
947        }
948
949        None
950    }
951
952    /// Detect if a query is a federation query (_service or _entities).
953    ///
954    /// Returns `Some(QueryType)` for federation queries, `None` otherwise.
955    fn detect_federation(&self, query: &str) -> Option<QueryType> {
956        let query_trimmed = query.trim();
957
958        // Check for _service query
959        if query_trimmed.contains("_service") {
960            return Some(QueryType::Federation("_service".to_string()));
961        }
962
963        // Check for _entities query
964        if query_trimmed.contains("_entities") {
965            return Some(QueryType::Federation("_entities".to_string()));
966        }
967
968        None
969    }
970
971    /// Extract the type name argument from `__type(name: "TypeName")`.
972    fn extract_type_argument(&self, query: &str) -> Option<String> {
973        // Find __type(name: "..." pattern
974        // Supports: __type(name: "User"), __type(name:"User"), __type(name: 'User')
975        let type_pos = query.find("__type")?;
976        let after_type = &query[type_pos + 6..];
977
978        // Find the opening parenthesis
979        let paren_pos = after_type.find('(')?;
980        let after_paren = &after_type[paren_pos + 1..];
981
982        // Find name: and extract the value
983        let name_pos = after_paren.find("name")?;
984        let after_name = &after_paren[name_pos + 4..].trim_start();
985
986        // Skip colon
987        let after_colon = if let Some(stripped) = after_name.strip_prefix(':') {
988            stripped.trim_start()
989        } else {
990            after_name
991        };
992
993        // Extract string value (either "..." or '...')
994        let quote_char = after_colon.chars().next()?;
995        if quote_char != '"' && quote_char != '\'' {
996            return None;
997        }
998
999        let after_quote = &after_colon[1..];
1000        let end_quote = after_quote.find(quote_char)?;
1001        Some(after_quote[..end_quote].to_string())
1002    }
1003
1004    /// Execute an aggregate query dispatch.
1005    async fn execute_aggregate_dispatch(
1006        &self,
1007        query_name: &str,
1008        variables: Option<&serde_json::Value>,
1009    ) -> Result<String> {
1010        // Extract table name from query name (e.g., "sales_aggregate" -> "tf_sales")
1011        let table_name =
1012            query_name.strip_suffix("_aggregate").ok_or_else(|| FraiseQLError::Validation {
1013                message: format!("Invalid aggregate query name: {}", query_name),
1014                path:    None,
1015            })?;
1016
1017        let fact_table_name = format!("tf_{}", table_name);
1018
1019        // Get fact table metadata from schema
1020        let metadata_json = self.schema.get_fact_table(&fact_table_name).ok_or_else(|| {
1021            FraiseQLError::Validation {
1022                message: format!("Fact table '{}' not found in schema", fact_table_name),
1023                path:    Some(format!("fact_tables.{}", fact_table_name)),
1024            }
1025        })?;
1026
1027        // Parse metadata into FactTableMetadata
1028        let metadata: crate::compiler::fact_table::FactTableMetadata =
1029            serde_json::from_value(metadata_json.clone())?;
1030
1031        // Parse query variables into aggregate query JSON
1032        let empty_json = serde_json::json!({});
1033        let query_json = variables.unwrap_or(&empty_json);
1034
1035        // Execute aggregate query
1036        self.execute_aggregate_query(query_json, query_name, &metadata).await
1037    }
1038
1039    /// Execute a window query dispatch.
1040    async fn execute_window_dispatch(
1041        &self,
1042        query_name: &str,
1043        variables: Option<&serde_json::Value>,
1044    ) -> Result<String> {
1045        // Extract table name from query name (e.g., "sales_window" -> "tf_sales")
1046        let table_name =
1047            query_name.strip_suffix("_window").ok_or_else(|| FraiseQLError::Validation {
1048                message: format!("Invalid window query name: {}", query_name),
1049                path:    None,
1050            })?;
1051
1052        let fact_table_name = format!("tf_{}", table_name);
1053
1054        // Get fact table metadata from schema
1055        let metadata_json = self.schema.get_fact_table(&fact_table_name).ok_or_else(|| {
1056            FraiseQLError::Validation {
1057                message: format!("Fact table '{}' not found in schema", fact_table_name),
1058                path:    Some(format!("fact_tables.{}", fact_table_name)),
1059            }
1060        })?;
1061
1062        // Parse metadata into FactTableMetadata
1063        let metadata: crate::compiler::fact_table::FactTableMetadata =
1064            serde_json::from_value(metadata_json.clone())?;
1065
1066        // Parse query variables into window query JSON
1067        let empty_json = serde_json::json!({});
1068        let query_json = variables.unwrap_or(&empty_json);
1069
1070        // Execute window query
1071        self.execute_window_query(query_json, query_name, &metadata).await
1072    }
1073
1074    /// Execute a federation query (_service or _entities).
1075    async fn execute_federation_query(
1076        &self,
1077        query_name: &str,
1078        query: &str,
1079        variables: Option<&serde_json::Value>,
1080    ) -> Result<String> {
1081        match query_name {
1082            "_service" => self.execute_service_query().await,
1083            "_entities" => self.execute_entities_query(query, variables).await,
1084            _ => Err(FraiseQLError::Validation {
1085                message: format!("Unknown federation query: {}", query_name),
1086                path:    None,
1087            }),
1088        }
1089    }
1090
1091    /// Execute _service query returning federation SDL.
1092    async fn execute_service_query(&self) -> Result<String> {
1093        // Get federation metadata from schema
1094        let fed_metadata =
1095            self.schema.federation_metadata().ok_or_else(|| FraiseQLError::Validation {
1096                message: "Federation not enabled in schema".to_string(),
1097                path:    None,
1098            })?;
1099
1100        // Generate SDL with federation directives
1101        let raw_schema = self.schema.raw_schema();
1102        let sdl = crate::federation::generate_service_sdl(&raw_schema, &fed_metadata);
1103
1104        // Return federation response format
1105        let response = serde_json::json!({
1106            "data": {
1107                "_service": {
1108                    "sdl": sdl
1109                }
1110            }
1111        });
1112
1113        Ok(serde_json::to_string(&response)?)
1114    }
1115
1116    /// Execute _entities query resolving federation entities.
1117    async fn execute_entities_query(
1118        &self,
1119        query: &str,
1120        variables: Option<&serde_json::Value>,
1121    ) -> Result<String> {
1122        // Get federation metadata from schema
1123        let fed_metadata =
1124            self.schema.federation_metadata().ok_or_else(|| FraiseQLError::Validation {
1125                message: "Federation not enabled in schema".to_string(),
1126                path:    None,
1127            })?;
1128
1129        // Extract representations from variables
1130        let representations_value =
1131            variables.and_then(|v| v.get("representations")).ok_or_else(|| {
1132                FraiseQLError::Validation {
1133                    message: "_entities query requires 'representations' variable".to_string(),
1134                    path:    None,
1135                }
1136            })?;
1137
1138        // Parse representations
1139        let representations =
1140            crate::federation::parse_representations(representations_value, &fed_metadata)
1141                .map_err(|e| FraiseQLError::Validation {
1142                    message: format!("Failed to parse representations: {}", e),
1143                    path:    None,
1144                })?;
1145
1146        // Validate representations
1147        crate::federation::validate_representations(&representations, &fed_metadata).map_err(
1148            |errors| FraiseQLError::Validation {
1149                message: format!("Invalid representations: {}", errors.join("; ")),
1150                path:    None,
1151            },
1152        )?;
1153
1154        // Create federation resolver
1155        let fed_resolver = crate::federation::FederationResolver::new(fed_metadata);
1156
1157        // Extract actual field selection from GraphQL query AST
1158        let selection = match crate::federation::selection_parser::parse_field_selection(query) {
1159            Ok(sel) if !sel.fields.is_empty() => {
1160                // Ensure __typename is always selected
1161                let mut fields = sel.fields;
1162                if !fields.contains(&"__typename".to_string()) {
1163                    fields.push("__typename".to_string());
1164                }
1165                crate::federation::FieldSelection::new(fields)
1166            },
1167            _ => {
1168                // Fallback to wildcard if parsing fails or no fields extracted
1169                crate::federation::FieldSelection::new(vec![
1170                    "__typename".to_string(),
1171                    "*".to_string(), // Wildcard for all fields (will be expanded by resolver)
1172                ])
1173            },
1174        };
1175
1176        // Extract or create trace context for federation operations
1177        // Note: Trace context should ideally be passed from HTTP headers via ExecutionContext,
1178        // but for now we create a new context for tracing federation operations.
1179        // The trace context could be injected through the query variables or a request-scoped store
1180        // in future versions to correlate with the incoming HTTP trace headers.
1181        let trace_context = crate::federation::FederationTraceContext::new();
1182
1183        // Batch load entities from database with tracing support
1184        let entities = crate::federation::batch_load_entities_with_tracing(
1185            &representations,
1186            &fed_resolver,
1187            Arc::clone(&self.adapter),
1188            &selection,
1189            Some(trace_context),
1190        )
1191        .await?;
1192
1193        // Return federation response format
1194        let response = serde_json::json!({
1195            "data": {
1196                "_entities": entities
1197            }
1198        });
1199
1200        Ok(serde_json::to_string(&response)?)
1201    }
1202
1203    /// Execute a window query.
1204    ///
1205    /// # Arguments
1206    ///
1207    /// * `query_json` - JSON representation of the window query
1208    /// * `query_name` - GraphQL field name (e.g., "sales_window")
1209    /// * `metadata` - Fact table metadata
1210    ///
1211    /// # Returns
1212    ///
1213    /// GraphQL response as JSON string
1214    ///
1215    /// # Errors
1216    ///
1217    /// Returns error if:
1218    /// - Query parsing fails
1219    /// - Execution plan generation fails
1220    /// - SQL generation fails
1221    /// - Database execution fails
1222    /// - Result projection fails
1223    ///
1224    /// # Example
1225    ///
1226    /// ```rust,ignore
1227    /// let query_json = json!({
1228    ///     "table": "tf_sales",
1229    ///     "select": [{"type": "measure", "name": "revenue", "alias": "revenue"}],
1230    ///     "windows": [{
1231    ///         "function": {"type": "row_number"},
1232    ///         "alias": "rank",
1233    ///         "partitionBy": [{"type": "dimension", "path": "category"}],
1234    ///         "orderBy": [{"field": "revenue", "direction": "DESC"}]
1235    ///     }]
1236    /// });
1237    ///
1238    /// let metadata = /* fact table metadata */;
1239    /// let result = executor.execute_window_query(&query_json, "sales_window", &metadata).await?;
1240    /// ```
1241    pub async fn execute_window_query(
1242        &self,
1243        query_json: &serde_json::Value,
1244        query_name: &str,
1245        metadata: &crate::compiler::fact_table::FactTableMetadata,
1246    ) -> Result<String> {
1247        // 1. Parse JSON query into WindowRequest
1248        let request = super::WindowQueryParser::parse(query_json, metadata)?;
1249
1250        // 2. Generate execution plan (validates semantic names against metadata)
1251        let plan =
1252            crate::compiler::window_functions::WindowPlanner::plan(request, metadata.clone())?;
1253
1254        // 3. Generate SQL
1255        let sql_generator = super::WindowSqlGenerator::new(self.adapter.database_type());
1256        let sql = sql_generator.generate(&plan)?;
1257
1258        // 4. Execute SQL
1259        let rows = self.adapter.execute_raw_query(&sql.complete_sql).await?;
1260
1261        // 5. Project results
1262        let projected = super::WindowProjector::project(rows, &plan)?;
1263
1264        // 6. Wrap in GraphQL data envelope
1265        let response = super::WindowProjector::wrap_in_data_envelope(projected, query_name);
1266
1267        // 7. Serialize to JSON string
1268        Ok(serde_json::to_string(&response)?)
1269    }
1270
1271    /// Execute a query and return parsed JSON.
1272    ///
1273    /// Same as `execute()` but returns parsed `serde_json::Value` instead of string.
1274    pub async fn execute_json(
1275        &self,
1276        query: &str,
1277        variables: Option<&serde_json::Value>,
1278    ) -> Result<serde_json::Value> {
1279        let result_str = self.execute(query, variables).await?;
1280        Ok(serde_json::from_str(&result_str)?)
1281    }
1282
1283    /// Execute an aggregate query.
1284    ///
1285    /// # Arguments
1286    ///
1287    /// * `query_json` - JSON representation of the aggregate query
1288    /// * `query_name` - GraphQL field name (e.g., "sales_aggregate")
1289    /// * `metadata` - Fact table metadata
1290    ///
1291    /// # Returns
1292    ///
1293    /// GraphQL response as JSON string
1294    ///
1295    /// # Errors
1296    ///
1297    /// Returns error if:
1298    /// - Query parsing fails
1299    /// - Execution plan generation fails
1300    /// - SQL generation fails
1301    /// - Database execution fails
1302    /// - Result projection fails
1303    ///
1304    /// # Example
1305    ///
1306    /// ```rust,ignore
1307    /// let query_json = json!({
1308    ///     "table": "tf_sales",
1309    ///     "groupBy": { "category": true },
1310    ///     "aggregates": [{"count": {}}]
1311    /// });
1312    ///
1313    /// let metadata = /* fact table metadata */;
1314    /// let result = executor.execute_aggregate_query(&query_json, "sales_aggregate", &metadata).await?;
1315    /// ```
1316    pub async fn execute_aggregate_query(
1317        &self,
1318        query_json: &serde_json::Value,
1319        query_name: &str,
1320        metadata: &crate::compiler::fact_table::FactTableMetadata,
1321    ) -> Result<String> {
1322        // 1. Parse JSON query into AggregationRequest
1323        let request = super::AggregateQueryParser::parse(query_json, metadata)?;
1324
1325        // 2. Generate execution plan
1326        let plan =
1327            crate::compiler::aggregation::AggregationPlanner::plan(request, metadata.clone())?;
1328
1329        // 3. Generate SQL
1330        let sql_generator = super::AggregationSqlGenerator::new(self.adapter.database_type());
1331        let sql = sql_generator.generate(&plan)?;
1332
1333        // 4. Execute SQL
1334        let rows = self.adapter.execute_raw_query(&sql.complete_sql).await?;
1335
1336        // 5. Project results
1337        let projected = super::AggregationProjector::project(rows, &plan)?;
1338
1339        // 6. Wrap in GraphQL data envelope
1340        let response = super::AggregationProjector::wrap_in_data_envelope(projected, query_name);
1341
1342        // 7. Serialize to JSON string
1343        Ok(serde_json::to_string(&response)?)
1344    }
1345
1346    /// Get the compiled schema.
1347    #[must_use]
1348    pub const fn schema(&self) -> &CompiledSchema {
1349        &self.schema
1350    }
1351
1352    /// Get runtime configuration.
1353    #[must_use]
1354    pub const fn config(&self) -> &RuntimeConfig {
1355        &self.config
1356    }
1357
1358    /// Get database adapter reference.
1359    #[must_use]
1360    pub fn adapter(&self) -> &Arc<A> {
1361        &self.adapter
1362    }
1363}
1364
1365#[cfg(test)]
1366mod tests {
1367    use async_trait::async_trait;
1368
1369    use super::*;
1370    use crate::{
1371        db::{types::JsonbValue, where_clause::WhereClause},
1372        runtime::JsonbOptimizationOptions,
1373        schema::{AutoParams, CompiledSchema, QueryDefinition},
1374    };
1375
1376    /// Mock database adapter for testing.
1377    struct MockAdapter {
1378        mock_results: Vec<JsonbValue>,
1379    }
1380
1381    impl MockAdapter {
1382        fn new(mock_results: Vec<JsonbValue>) -> Self {
1383            Self { mock_results }
1384        }
1385    }
1386
1387    #[async_trait]
1388    impl DatabaseAdapter for MockAdapter {
1389        async fn execute_with_projection(
1390            &self,
1391            view: &str,
1392            _projection: Option<&crate::schema::SqlProjectionHint>,
1393            where_clause: Option<&WhereClause>,
1394            limit: Option<u32>,
1395        ) -> Result<Vec<JsonbValue>> {
1396            // Fall back to standard query for tests
1397            self.execute_where_query(view, where_clause, limit, None).await
1398        }
1399
1400        async fn execute_where_query(
1401            &self,
1402            _view: &str,
1403            _where_clause: Option<&WhereClause>,
1404            _limit: Option<u32>,
1405            _offset: Option<u32>,
1406        ) -> Result<Vec<JsonbValue>> {
1407            Ok(self.mock_results.clone())
1408        }
1409
1410        async fn health_check(&self) -> Result<()> {
1411            Ok(())
1412        }
1413
1414        fn database_type(&self) -> DatabaseType {
1415            DatabaseType::PostgreSQL
1416        }
1417
1418        fn pool_metrics(&self) -> PoolMetrics {
1419            PoolMetrics {
1420                total_connections:  1,
1421                active_connections: 0,
1422                idle_connections:   1,
1423                waiting_requests:   0,
1424            }
1425        }
1426
1427        async fn execute_raw_query(
1428            &self,
1429            _sql: &str,
1430        ) -> Result<Vec<std::collections::HashMap<String, serde_json::Value>>> {
1431            // Mock implementation: return empty results
1432            Ok(vec![])
1433        }
1434
1435        async fn execute_function_call(
1436            &self,
1437            _function_name: &str,
1438            _args: &[serde_json::Value],
1439        ) -> Result<Vec<std::collections::HashMap<String, serde_json::Value>>> {
1440            Ok(vec![])
1441        }
1442    }
1443
1444    fn test_schema() -> CompiledSchema {
1445        let mut schema = CompiledSchema::new();
1446        schema.queries.push(QueryDefinition {
1447            name:         "users".to_string(),
1448            return_type:  "User".to_string(),
1449            returns_list: true,
1450            nullable:     false,
1451            arguments:    Vec::new(),
1452            sql_source:   Some("v_user".to_string()),
1453            description:  None,
1454            auto_params:  AutoParams::default(),
1455            deprecation:  None,
1456            jsonb_column: "data".to_string(),
1457        });
1458        schema
1459    }
1460
1461    fn mock_user_results() -> Vec<JsonbValue> {
1462        vec![
1463            JsonbValue::new(serde_json::json!({"id": "1", "name": "Alice"})),
1464            JsonbValue::new(serde_json::json!({"id": "2", "name": "Bob"})),
1465        ]
1466    }
1467
1468    #[tokio::test]
1469    async fn test_executor_new() {
1470        let schema = test_schema();
1471        let adapter = Arc::new(MockAdapter::new(vec![]));
1472        let executor = Executor::new(schema, adapter);
1473
1474        assert_eq!(executor.schema().queries.len(), 1);
1475    }
1476
1477    #[tokio::test]
1478    async fn test_execute_query() {
1479        let schema = test_schema();
1480        let adapter = Arc::new(MockAdapter::new(mock_user_results()));
1481        let executor = Executor::new(schema, adapter);
1482
1483        let query = "{ users { id name } }";
1484        let result = executor.execute(query, None).await.unwrap();
1485
1486        assert!(result.contains("\"data\""));
1487        assert!(result.contains("\"users\""));
1488        assert!(result.contains("\"id\""));
1489        assert!(result.contains("\"name\""));
1490    }
1491
1492    #[tokio::test]
1493    async fn test_execute_json() {
1494        let schema = test_schema();
1495        let adapter = Arc::new(MockAdapter::new(mock_user_results()));
1496        let executor = Executor::new(schema, adapter);
1497
1498        let query = "{ users { id name } }";
1499        let result = executor.execute_json(query, None).await.unwrap();
1500
1501        assert!(result.get("data").is_some());
1502        assert!(result["data"].get("users").is_some());
1503    }
1504
1505    #[tokio::test]
1506    async fn test_executor_with_config() {
1507        let schema = test_schema();
1508        let adapter = Arc::new(MockAdapter::new(vec![]));
1509        let config = RuntimeConfig {
1510            cache_query_plans:    false,
1511            max_query_depth:      5,
1512            max_query_complexity: 500,
1513            enable_tracing:       true,
1514            field_filter:         None,
1515            rls_policy:           None,
1516            query_timeout_ms:     30_000,
1517            jsonb_optimization:   JsonbOptimizationOptions::default(),
1518        };
1519
1520        let executor = Executor::with_config(schema, adapter, config);
1521
1522        assert!(!executor.config().cache_query_plans);
1523        assert_eq!(executor.config().max_query_depth, 5);
1524        assert!(executor.config().enable_tracing);
1525    }
1526
1527    #[tokio::test]
1528    async fn test_introspection_schema_query() {
1529        let schema = test_schema();
1530        let adapter = Arc::new(MockAdapter::new(vec![]));
1531        let executor = Executor::new(schema, adapter);
1532
1533        let query = r"{ __schema { queryType { name } } }";
1534        let result = executor.execute(query, None).await.unwrap();
1535
1536        assert!(result.contains("__schema"));
1537        assert!(result.contains("Query"));
1538    }
1539
1540    #[tokio::test]
1541    async fn test_introspection_type_query() {
1542        let schema = test_schema();
1543        let adapter = Arc::new(MockAdapter::new(vec![]));
1544        let executor = Executor::new(schema, adapter);
1545
1546        let query = r#"{ __type(name: "Int") { kind name } }"#;
1547        let result = executor.execute(query, None).await.unwrap();
1548
1549        assert!(result.contains("__type"));
1550        assert!(result.contains("Int"));
1551    }
1552
1553    #[tokio::test]
1554    async fn test_introspection_unknown_type() {
1555        let schema = test_schema();
1556        let adapter = Arc::new(MockAdapter::new(vec![]));
1557        let executor = Executor::new(schema, adapter);
1558
1559        let query = r#"{ __type(name: "UnknownType") { kind name } }"#;
1560        let result = executor.execute(query, None).await.unwrap();
1561
1562        // Unknown type returns null
1563        assert!(result.contains("null"));
1564    }
1565
1566    #[test]
1567    fn test_detect_introspection_schema() {
1568        let schema = test_schema();
1569        let adapter = Arc::new(MockAdapter::new(vec![]));
1570        let executor = Executor::new(schema, adapter);
1571
1572        let query = r"{ __schema { types { name } } }";
1573        let query_type = executor.classify_query(query).unwrap();
1574        assert_eq!(query_type, QueryType::IntrospectionSchema);
1575    }
1576
1577    #[test]
1578    fn test_detect_introspection_type() {
1579        let schema = test_schema();
1580        let adapter = Arc::new(MockAdapter::new(vec![]));
1581        let executor = Executor::new(schema, adapter);
1582
1583        let query = r#"{ __type(name: "User") { fields { name } } }"#;
1584        let query_type = executor.classify_query(query).unwrap();
1585        assert_eq!(query_type, QueryType::IntrospectionType("User".to_string()));
1586    }
1587
1588    #[test]
1589    fn test_extract_type_argument() {
1590        let schema = test_schema();
1591        let adapter = Arc::new(MockAdapter::new(vec![]));
1592        let executor = Executor::new(schema, adapter);
1593
1594        // Double quotes
1595        let query1 = r#"{ __type(name: "User") { name } }"#;
1596        assert_eq!(executor.extract_type_argument(query1), Some("User".to_string()));
1597
1598        // Single quotes
1599        let query2 = r"{ __type(name: 'Product') { name } }";
1600        assert_eq!(executor.extract_type_argument(query2), Some("Product".to_string()));
1601
1602        // No space after colon
1603        let query3 = r#"{ __type(name:"Query") { name } }"#;
1604        assert_eq!(executor.extract_type_argument(query3), Some("Query".to_string()));
1605    }
1606
1607    // ==================== ExecutionContext Tests ====================
1608
1609    #[test]
1610    fn test_execution_context_creation() {
1611        let ctx = ExecutionContext::new("query-123".to_string());
1612        assert_eq!(ctx.query_id(), "query-123");
1613        assert!(!ctx.is_cancelled());
1614    }
1615
1616    #[test]
1617    fn test_execution_context_cancellation_token() {
1618        let ctx = ExecutionContext::new("query-456".to_string());
1619        let token = ctx.cancellation_token();
1620        assert!(!token.is_cancelled());
1621
1622        // Cancel the token
1623        token.cancel();
1624        assert!(token.is_cancelled());
1625        assert!(ctx.is_cancelled());
1626    }
1627
1628    #[tokio::test]
1629    async fn test_execute_with_context_success() {
1630        let schema = test_schema();
1631        let adapter = Arc::new(MockAdapter::new(vec![]));
1632        let executor = Executor::new(schema, adapter);
1633
1634        let ctx = ExecutionContext::new("test-query-1".to_string());
1635        let query = r"{ __schema { queryType { name } } }";
1636
1637        let result = executor.execute_with_context(query, None, &ctx).await;
1638        assert!(result.is_ok());
1639        assert!(result.unwrap().contains("__schema"));
1640    }
1641
1642    #[tokio::test]
1643    async fn test_execute_with_context_already_cancelled() {
1644        let schema = test_schema();
1645        let adapter = Arc::new(MockAdapter::new(vec![]));
1646        let executor = Executor::new(schema, adapter);
1647
1648        let ctx = ExecutionContext::new("test-query-2".to_string());
1649        let token = ctx.cancellation_token().clone();
1650
1651        // Cancel before execution
1652        token.cancel();
1653
1654        let query = r"{ __schema { queryType { name } } }";
1655        let result = executor.execute_with_context(query, None, &ctx).await;
1656
1657        assert!(result.is_err());
1658        match result.unwrap_err() {
1659            FraiseQLError::Cancelled { query_id, reason } => {
1660                assert_eq!(query_id, "test-query-2");
1661                assert!(reason.contains("before execution"));
1662            },
1663            e => panic!("Expected Cancelled error, got: {}", e),
1664        }
1665    }
1666
1667    #[tokio::test]
1668    async fn test_execute_with_context_cancelled_during_execution() {
1669        let schema = test_schema();
1670        let adapter = Arc::new(MockAdapter::new(vec![]));
1671        let executor = Executor::new(schema, adapter);
1672
1673        let ctx = ExecutionContext::new("test-query-3".to_string());
1674        let token = ctx.cancellation_token().clone();
1675
1676        // Spawn a task to cancel after a short delay
1677        tokio::spawn(async move {
1678            tokio::time::sleep(Duration::from_millis(10)).await;
1679            token.cancel();
1680        });
1681
1682        let query = r"{ __schema { queryType { name } } }";
1683        let result = executor.execute_with_context(query, None, &ctx).await;
1684
1685        // Depending on timing, may succeed or be cancelled (both are acceptable)
1686        // But if cancelled, it should be our error
1687        if let Err(FraiseQLError::Cancelled { query_id, .. }) = result {
1688            assert_eq!(query_id, "test-query-3");
1689        }
1690    }
1691
1692    #[test]
1693    fn test_execution_context_clone() {
1694        let ctx = ExecutionContext::new("query-clone".to_string());
1695        let ctx_clone = ctx.clone();
1696
1697        assert_eq!(ctx.query_id(), ctx_clone.query_id());
1698        assert!(!ctx_clone.is_cancelled());
1699
1700        // Cancel original
1701        ctx.cancellation_token().cancel();
1702
1703        // Clone should also see cancellation (same token)
1704        assert!(ctx_clone.is_cancelled());
1705    }
1706
1707    #[test]
1708    fn test_error_cancelled_constructor() {
1709        let err = FraiseQLError::cancelled("query-001", "user requested cancellation");
1710
1711        assert!(err.to_string().contains("Query cancelled"));
1712        assert_eq!(err.status_code(), 408);
1713        assert_eq!(err.error_code(), "CANCELLED");
1714        assert!(err.is_retryable());
1715        assert!(err.is_server_error());
1716    }
1717
1718    // ========================================================================
1719
1720    // ========================================================================
1721
1722    #[test]
1723    fn test_jsonb_strategy_in_runtime_config() {
1724        // Verify that RuntimeConfig includes JSONB optimization options
1725        let config = RuntimeConfig {
1726            cache_query_plans:    false,
1727            max_query_depth:      5,
1728            max_query_complexity: 500,
1729            enable_tracing:       true,
1730            field_filter:         None,
1731            rls_policy:           None,
1732            query_timeout_ms:     30_000,
1733            jsonb_optimization:   JsonbOptimizationOptions::default(),
1734        };
1735
1736        assert_eq!(config.jsonb_optimization.default_strategy, JsonbStrategy::Project);
1737        assert_eq!(config.jsonb_optimization.auto_threshold_percent, 80);
1738    }
1739
1740    #[test]
1741    fn test_jsonb_strategy_custom_config() {
1742        // Verify custom JSONB strategy options in config
1743        let custom_options = JsonbOptimizationOptions {
1744            default_strategy:       JsonbStrategy::Stream,
1745            auto_threshold_percent: 50,
1746        };
1747
1748        let config = RuntimeConfig {
1749            cache_query_plans:    false,
1750            max_query_depth:      5,
1751            max_query_complexity: 500,
1752            enable_tracing:       true,
1753            field_filter:         None,
1754            rls_policy:           None,
1755            query_timeout_ms:     30_000,
1756            jsonb_optimization:   custom_options,
1757        };
1758
1759        assert_eq!(config.jsonb_optimization.default_strategy, JsonbStrategy::Stream);
1760        assert_eq!(config.jsonb_optimization.auto_threshold_percent, 50);
1761    }
1762}