1use std::{sync::Arc, time::Duration};
18
19use super::{
20 ExecutionContext, JsonbStrategy, QueryMatcher, QueryPlanner, ResultProjector, RuntimeConfig,
21 filter_fields,
22 mutation_result::{MutationOutcome, parse_mutation_row, populate_error_fields},
23};
24#[cfg(test)]
25use crate::db::types::{DatabaseType, PoolMetrics};
26use crate::{
27 db::{WhereClause, projection_generator::PostgresProjectionGenerator, traits::DatabaseAdapter},
28 error::{FraiseQLError, Result},
29 graphql::parse_query,
30 schema::{CompiledSchema, IntrospectionResponses, SecurityConfig, SqlProjectionHint},
31 security::{FieldAccessError, SecurityContext},
32};
33
34#[derive(Debug, Clone, PartialEq)]
36enum QueryType {
37 Regular,
39
40 Aggregate(String),
43
44 Window(String),
47
48 Federation(String),
51
52 IntrospectionSchema,
54
55 IntrospectionType(String),
58
59 Mutation(String),
62}
63
64pub struct Executor<A: DatabaseAdapter> {
105 schema: CompiledSchema,
107
108 adapter: Arc<A>,
111
112 matcher: QueryMatcher,
114
115 planner: QueryPlanner,
117
118 config: RuntimeConfig,
120
121 introspection: IntrospectionResponses,
124}
125
126impl<A: DatabaseAdapter> Executor<A> {
127 #[must_use]
142 pub fn new(schema: CompiledSchema, adapter: Arc<A>) -> Self {
143 Self::with_config(schema, adapter, RuntimeConfig::default())
144 }
145
146 #[must_use]
154 pub fn with_config(schema: CompiledSchema, adapter: Arc<A>, config: RuntimeConfig) -> Self {
155 let matcher = QueryMatcher::new(schema.clone());
156 let planner = QueryPlanner::new(config.cache_query_plans);
157 let introspection = IntrospectionResponses::build(&schema);
159
160 Self {
161 schema,
162 adapter,
163 matcher,
164 planner,
165 config,
166 introspection,
167 }
168 }
169
170 pub async fn execute(
197 &self,
198 query: &str,
199 variables: Option<&serde_json::Value>,
200 ) -> Result<String> {
201 if self.config.query_timeout_ms > 0 {
203 let timeout_duration = Duration::from_millis(self.config.query_timeout_ms);
204 tokio::time::timeout(timeout_duration, self.execute_internal(query, variables))
205 .await
206 .map_err(|_| {
207 let query_snippet = if query.len() > 100 {
209 format!("{}...", &query[..100])
210 } else {
211 query.to_string()
212 };
213 FraiseQLError::Timeout {
214 timeout_ms: self.config.query_timeout_ms,
215 query: Some(query_snippet),
216 }
217 })?
218 } else {
219 self.execute_internal(query, variables).await
220 }
221 }
222
223 async fn execute_internal(
225 &self,
226 query: &str,
227 variables: Option<&serde_json::Value>,
228 ) -> Result<String> {
229 let query_type = self.classify_query(query)?;
231
232 match query_type {
234 QueryType::Regular => self.execute_regular_query(query, variables).await,
235 QueryType::Aggregate(query_name) => {
236 self.execute_aggregate_dispatch(&query_name, variables).await
237 },
238 QueryType::Window(query_name) => {
239 self.execute_window_dispatch(&query_name, variables).await
240 },
241 QueryType::Federation(query_name) => {
242 self.execute_federation_query(&query_name, query, variables).await
243 },
244 QueryType::IntrospectionSchema => {
245 Ok(self.introspection.schema_response.clone())
247 },
248 QueryType::IntrospectionType(type_name) => {
249 Ok(self.introspection.get_type_response(&type_name))
251 },
252 QueryType::Mutation(mutation_name) => {
253 self.execute_mutation_query(&mutation_name, variables).await
254 },
255 }
256 }
257
258 pub async fn execute_with_scopes(
282 &self,
283 query: &str,
284 variables: Option<&serde_json::Value>,
285 user_scopes: &[String],
286 ) -> Result<String> {
287 let query_type = self.classify_query(query)?;
289
290 if let Some(ref filter) = self.config.field_filter {
292 if matches!(query_type, QueryType::Regular) {
294 self.validate_field_access(query, variables, user_scopes, filter)?;
295 }
296 }
297
298 match query_type {
300 QueryType::Regular => self.execute_regular_query(query, variables).await,
301 QueryType::Aggregate(query_name) => {
302 self.execute_aggregate_dispatch(&query_name, variables).await
303 },
304 QueryType::Window(query_name) => {
305 self.execute_window_dispatch(&query_name, variables).await
306 },
307 QueryType::Federation(query_name) => {
308 self.execute_federation_query(&query_name, query, variables).await
309 },
310 QueryType::IntrospectionSchema => Ok(self.introspection.schema_response.clone()),
311 QueryType::IntrospectionType(type_name) => {
312 Ok(self.introspection.get_type_response(&type_name))
313 },
314 QueryType::Mutation(mutation_name) => {
315 self.execute_mutation_query(&mutation_name, variables).await
316 },
317 }
318 }
319
320 fn validate_field_access(
322 &self,
323 query: &str,
324 variables: Option<&serde_json::Value>,
325 user_scopes: &[String],
326 filter: &crate::security::FieldFilter,
327 ) -> Result<()> {
328 let query_match = self.matcher.match_query(query, variables)?;
330
331 let type_name = &query_match.query_def.return_type;
333
334 let field_refs: Vec<&str> = query_match.fields.iter().map(String::as_str).collect();
336 let errors = filter.validate_fields(type_name, &field_refs, user_scopes);
337
338 if errors.is_empty() {
339 Ok(())
340 } else {
341 let first_error = &errors[0];
343 Err(FraiseQLError::Authorization {
344 message: first_error.message.clone(),
345 action: Some("read".to_string()),
346 resource: Some(format!("{}.{}", first_error.type_name, first_error.field_name)),
347 })
348 }
349 }
350
351 pub async fn execute_with_context(
389 &self,
390 query: &str,
391 variables: Option<&serde_json::Value>,
392 ctx: &ExecutionContext,
393 ) -> Result<String> {
394 if ctx.is_cancelled() {
396 return Err(FraiseQLError::cancelled(
397 ctx.query_id().to_string(),
398 "Query cancelled before execution".to_string(),
399 ));
400 }
401
402 let token = ctx.cancellation_token().clone();
403
404 tokio::select! {
406 result = self.execute(query, variables) => {
407 result
408 }
409 () = token.cancelled() => {
410 Err(FraiseQLError::cancelled(
411 ctx.query_id().to_string(),
412 "Query cancelled during execution".to_string(),
413 ))
414 }
415 }
416 }
417
418 pub async fn execute_with_security(
454 &self,
455 query: &str,
456 variables: Option<&serde_json::Value>,
457 security_context: &SecurityContext,
458 ) -> Result<String> {
459 if self.config.query_timeout_ms > 0 {
461 let timeout_duration = Duration::from_millis(self.config.query_timeout_ms);
462 tokio::time::timeout(
463 timeout_duration,
464 self.execute_with_security_internal(query, variables, security_context),
465 )
466 .await
467 .map_err(|_| {
468 let query_snippet = if query.len() > 100 {
469 format!("{}...", &query[..100])
470 } else {
471 query.to_string()
472 };
473 FraiseQLError::Timeout {
474 timeout_ms: self.config.query_timeout_ms,
475 query: Some(query_snippet),
476 }
477 })?
478 } else {
479 self.execute_with_security_internal(query, variables, security_context).await
480 }
481 }
482
483 async fn execute_with_security_internal(
486 &self,
487 query: &str,
488 variables: Option<&serde_json::Value>,
489 security_context: &SecurityContext,
490 ) -> Result<String> {
491 let query_type = self.classify_query(query)?;
493
494 match query_type {
496 QueryType::Regular => {
497 self.execute_regular_query_with_security(query, variables, security_context)
498 .await
499 },
500 QueryType::Aggregate(query_name) => {
502 self.execute_aggregate_dispatch(&query_name, variables).await
503 },
504 QueryType::Window(query_name) => {
505 self.execute_window_dispatch(&query_name, variables).await
506 },
507 QueryType::Federation(query_name) => {
508 self.execute_federation_query(&query_name, query, variables).await
509 },
510 QueryType::IntrospectionSchema => Ok(self.introspection.schema_response.clone()),
511 QueryType::IntrospectionType(type_name) => {
512 Ok(self.introspection.get_type_response(&type_name))
513 },
514 QueryType::Mutation(mutation_name) => {
515 self.execute_mutation_query(&mutation_name, variables).await
516 },
517 }
518 }
519
520 pub fn check_field_access(
534 &self,
535 type_name: &str,
536 field_name: &str,
537 user_scopes: &[String],
538 ) -> std::result::Result<(), FieldAccessError> {
539 if let Some(ref filter) = self.config.field_filter {
540 filter.can_access(type_name, field_name, user_scopes)
541 } else {
542 Ok(())
544 }
545 }
546
547 fn apply_field_rbac_filtering(
562 &self,
563 return_type: &str,
564 projection_fields: Vec<String>,
565 security_context: &SecurityContext,
566 ) -> Result<Vec<String>> {
567 if let Some(ref security_json) = self.schema.security {
569 let security_config: SecurityConfig = serde_json::from_value(security_json.clone())
571 .map_err(|_| FraiseQLError::Validation {
572 message: "Invalid security configuration in compiled schema".to_string(),
573 path: Some("schema.security".to_string()),
574 })?;
575
576 if let Some(type_def) = self.schema.types.iter().find(|t| t.name == return_type) {
578 let accessible_fields =
580 filter_fields(security_context, &security_config, &type_def.fields);
581
582 let accessible_names: std::collections::HashSet<String> =
584 accessible_fields.iter().map(|f| f.name.clone()).collect();
585
586 let filtered: Vec<String> = projection_fields
587 .into_iter()
588 .filter(|name| accessible_names.contains(name))
589 .collect();
590
591 return Ok(filtered);
592 }
593 }
594
595 Ok(projection_fields)
597 }
598
599 async fn execute_regular_query_with_security(
612 &self,
613 query: &str,
614 variables: Option<&serde_json::Value>,
615 security_context: &SecurityContext,
616 ) -> Result<String> {
617 if security_context.is_expired() {
619 return Err(FraiseQLError::Validation {
620 message: "Security token has expired".to_string(),
621 path: Some("request.authorization".to_string()),
622 });
623 }
624
625 let query_match = self.matcher.match_query(query, variables)?;
627
628 let plan = self.planner.plan(&query_match)?;
630
631 let rls_where_clause: Option<WhereClause> =
633 if let Some(ref rls_policy) = self.config.rls_policy {
634 rls_policy.evaluate(security_context, &query_match.query_def.name)?
636 } else {
637 None
639 };
640
641 let sql_source =
643 query_match
644 .query_def
645 .sql_source
646 .as_ref()
647 .ok_or_else(|| FraiseQLError::Validation {
648 message: "Query has no SQL source".to_string(),
649 path: None,
650 })?;
651
652 let projection_hint = if !plan.projection_fields.is_empty()
655 && plan.jsonb_strategy == JsonbStrategy::Project
656 {
657 let generator = PostgresProjectionGenerator::new();
658 let projection_sql = generator
659 .generate_projection_sql(&plan.projection_fields)
660 .unwrap_or_else(|_| "data".to_string());
661
662 Some(SqlProjectionHint {
663 database: "postgresql".to_string(),
664 projection_template: projection_sql,
665 estimated_reduction_percent: 50,
666 })
667 } else {
668 None
670 };
671
672 let results = self
676 .adapter
677 .execute_with_projection(
678 sql_source,
679 projection_hint.as_ref(),
680 rls_where_clause.as_ref(),
681 None,
682 )
683 .await?;
684
685 let filtered_projection_fields = self.apply_field_rbac_filtering(
688 &query_match.query_def.return_type,
689 plan.projection_fields,
690 security_context,
691 )?;
692
693 let projector = ResultProjector::new(filtered_projection_fields);
695 let projected = projector.project_results(&results, query_match.query_def.returns_list)?;
696
697 let response =
699 ResultProjector::wrap_in_data_envelope(projected, &query_match.query_def.name);
700
701 Ok(serde_json::to_string(&response)?)
703 }
704
705 async fn execute_regular_query(
706 &self,
707 query: &str,
708 variables: Option<&serde_json::Value>,
709 ) -> Result<String> {
710 let query_match = self.matcher.match_query(query, variables)?;
712
713 let plan = self.planner.plan(&query_match)?;
715
716 let sql_source = query_match.query_def.sql_source.as_ref().ok_or_else(|| {
718 crate::error::FraiseQLError::Validation {
719 message: "Query has no SQL source".to_string(),
720 path: None,
721 }
722 })?;
723
724 let projection_hint = if !plan.projection_fields.is_empty()
728 && plan.jsonb_strategy == JsonbStrategy::Project
729 {
730 let generator = PostgresProjectionGenerator::new();
731 let projection_sql = generator
732 .generate_projection_sql(&plan.projection_fields)
733 .unwrap_or_else(|_| "data".to_string());
734
735 Some(SqlProjectionHint {
736 database: "postgresql".to_string(),
737 projection_template: projection_sql,
738 estimated_reduction_percent: 50,
739 })
740 } else {
741 None
743 };
744
745 let results = self
746 .adapter
747 .execute_with_projection(sql_source, projection_hint.as_ref(), None, None)
748 .await?;
749
750 let projector = ResultProjector::new(plan.projection_fields);
752 let projected = projector.project_results(&results, query_match.query_def.returns_list)?;
753
754 let response =
756 ResultProjector::wrap_in_data_envelope(projected, &query_match.query_def.name);
757
758 Ok(serde_json::to_string(&response)?)
760 }
761
762 async fn execute_mutation_query(
768 &self,
769 mutation_name: &str,
770 variables: Option<&serde_json::Value>,
771 ) -> Result<String> {
772 let mutation_def =
774 self.schema.find_mutation(mutation_name).ok_or_else(|| {
775 FraiseQLError::Validation {
776 message: format!("Unknown mutation: {mutation_name}"),
777 path: None,
778 }
779 })?;
780
781 let sql_source = mutation_def.sql_source.as_deref().ok_or_else(|| {
783 FraiseQLError::Validation {
784 message: format!("Mutation '{mutation_name}' has no sql_source configured"),
785 path: None,
786 }
787 })?;
788
789 let vars_obj = variables.and_then(|v| v.as_object());
791 let args: Vec<serde_json::Value> = mutation_def
792 .arguments
793 .iter()
794 .map(|arg| {
795 vars_obj
796 .and_then(|obj| obj.get(&arg.name))
797 .cloned()
798 .unwrap_or(serde_json::Value::Null)
799 })
800 .collect();
801
802 let rows = self.adapter.execute_function_call(sql_source, &args).await?;
804
805 let row = rows.into_iter().next().ok_or_else(|| FraiseQLError::Validation {
807 message: format!(
808 "Mutation '{mutation_name}': function returned no rows"
809 ),
810 path: None,
811 })?;
812
813 let outcome = parse_mutation_row(&row)?;
815
816 let mutation_return_type = mutation_def.return_type.clone();
818 let mutation_name_owned = mutation_name.to_string();
819
820 let result_json = match outcome {
821 MutationOutcome::Success { entity, entity_type, .. } => {
822 let typename = entity_type
824 .or_else(|| {
825 self.schema
827 .find_union(&mutation_return_type)
828 .and_then(|u| {
829 u.member_types.iter().find(|t| {
830 self.schema
831 .find_type(t)
832 .map(|td| !td.is_error)
833 .unwrap_or(true)
834 })
835 })
836 .cloned()
837 })
838 .unwrap_or_else(|| mutation_return_type.clone());
839
840 let mut obj = entity
841 .as_object()
842 .cloned()
843 .unwrap_or_default();
844 obj.insert(
845 "__typename".to_string(),
846 serde_json::Value::String(typename),
847 );
848 serde_json::Value::Object(obj)
849 },
850 MutationOutcome::Error { status, metadata, .. } => {
851 let error_type = self
853 .schema
854 .find_union(&mutation_return_type)
855 .and_then(|u| {
856 u.member_types.iter().find_map(|t| {
857 let td = self.schema.find_type(t)?;
858 if td.is_error { Some(td) } else { None }
859 })
860 });
861
862 match error_type {
863 Some(td) => {
864 let mut fields =
865 populate_error_fields(&td.fields, &metadata);
866 fields.insert(
867 "__typename".to_string(),
868 serde_json::Value::String(td.name.clone()),
869 );
870 fields.insert(
872 "status".to_string(),
873 serde_json::Value::String(status),
874 );
875 serde_json::Value::Object(fields)
876 },
877 None => {
878 serde_json::json!({ "__typename": mutation_return_type, "status": status })
880 },
881 }
882 },
883 };
884
885 let response = ResultProjector::wrap_in_data_envelope(result_json, &mutation_name_owned);
886 Ok(serde_json::to_string(&response)?)
887 }
888
889 fn classify_query(&self, query: &str) -> Result<QueryType> {
891 if let Some(introspection_type) = self.detect_introspection(query) {
893 return Ok(introspection_type);
894 }
895
896 if let Some(federation_type) = self.detect_federation(query) {
898 return Ok(federation_type);
899 }
900
901 let parsed = parse_query(query).map_err(|e| FraiseQLError::Parse {
903 message: e.to_string(),
904 location: "query".to_string(),
905 })?;
906
907 let root_field = &parsed.root_field;
908
909 if parsed.operation_type == "mutation" {
911 return Ok(QueryType::Mutation(root_field.clone()));
912 }
913
914 if root_field.ends_with("_aggregate") {
916 return Ok(QueryType::Aggregate(root_field.clone()));
917 }
918
919 if root_field.ends_with("_window") {
921 return Ok(QueryType::Window(root_field.clone()));
922 }
923
924 Ok(QueryType::Regular)
926 }
927
928 fn detect_introspection(&self, query: &str) -> Option<QueryType> {
932 let query_trimmed = query.trim();
933
934 if query_trimmed.contains("__schema") {
936 return Some(QueryType::IntrospectionSchema);
937 }
938
939 if query_trimmed.contains("__type") {
941 if let Some(type_name) = self.extract_type_argument(query_trimmed) {
943 return Some(QueryType::IntrospectionType(type_name));
944 }
945 return Some(QueryType::IntrospectionSchema);
947 }
948
949 None
950 }
951
952 fn detect_federation(&self, query: &str) -> Option<QueryType> {
956 let query_trimmed = query.trim();
957
958 if query_trimmed.contains("_service") {
960 return Some(QueryType::Federation("_service".to_string()));
961 }
962
963 if query_trimmed.contains("_entities") {
965 return Some(QueryType::Federation("_entities".to_string()));
966 }
967
968 None
969 }
970
971 fn extract_type_argument(&self, query: &str) -> Option<String> {
973 let type_pos = query.find("__type")?;
976 let after_type = &query[type_pos + 6..];
977
978 let paren_pos = after_type.find('(')?;
980 let after_paren = &after_type[paren_pos + 1..];
981
982 let name_pos = after_paren.find("name")?;
984 let after_name = &after_paren[name_pos + 4..].trim_start();
985
986 let after_colon = if let Some(stripped) = after_name.strip_prefix(':') {
988 stripped.trim_start()
989 } else {
990 after_name
991 };
992
993 let quote_char = after_colon.chars().next()?;
995 if quote_char != '"' && quote_char != '\'' {
996 return None;
997 }
998
999 let after_quote = &after_colon[1..];
1000 let end_quote = after_quote.find(quote_char)?;
1001 Some(after_quote[..end_quote].to_string())
1002 }
1003
1004 async fn execute_aggregate_dispatch(
1006 &self,
1007 query_name: &str,
1008 variables: Option<&serde_json::Value>,
1009 ) -> Result<String> {
1010 let table_name =
1012 query_name.strip_suffix("_aggregate").ok_or_else(|| FraiseQLError::Validation {
1013 message: format!("Invalid aggregate query name: {}", query_name),
1014 path: None,
1015 })?;
1016
1017 let fact_table_name = format!("tf_{}", table_name);
1018
1019 let metadata_json = self.schema.get_fact_table(&fact_table_name).ok_or_else(|| {
1021 FraiseQLError::Validation {
1022 message: format!("Fact table '{}' not found in schema", fact_table_name),
1023 path: Some(format!("fact_tables.{}", fact_table_name)),
1024 }
1025 })?;
1026
1027 let metadata: crate::compiler::fact_table::FactTableMetadata =
1029 serde_json::from_value(metadata_json.clone())?;
1030
1031 let empty_json = serde_json::json!({});
1033 let query_json = variables.unwrap_or(&empty_json);
1034
1035 self.execute_aggregate_query(query_json, query_name, &metadata).await
1037 }
1038
1039 async fn execute_window_dispatch(
1041 &self,
1042 query_name: &str,
1043 variables: Option<&serde_json::Value>,
1044 ) -> Result<String> {
1045 let table_name =
1047 query_name.strip_suffix("_window").ok_or_else(|| FraiseQLError::Validation {
1048 message: format!("Invalid window query name: {}", query_name),
1049 path: None,
1050 })?;
1051
1052 let fact_table_name = format!("tf_{}", table_name);
1053
1054 let metadata_json = self.schema.get_fact_table(&fact_table_name).ok_or_else(|| {
1056 FraiseQLError::Validation {
1057 message: format!("Fact table '{}' not found in schema", fact_table_name),
1058 path: Some(format!("fact_tables.{}", fact_table_name)),
1059 }
1060 })?;
1061
1062 let metadata: crate::compiler::fact_table::FactTableMetadata =
1064 serde_json::from_value(metadata_json.clone())?;
1065
1066 let empty_json = serde_json::json!({});
1068 let query_json = variables.unwrap_or(&empty_json);
1069
1070 self.execute_window_query(query_json, query_name, &metadata).await
1072 }
1073
1074 async fn execute_federation_query(
1076 &self,
1077 query_name: &str,
1078 query: &str,
1079 variables: Option<&serde_json::Value>,
1080 ) -> Result<String> {
1081 match query_name {
1082 "_service" => self.execute_service_query().await,
1083 "_entities" => self.execute_entities_query(query, variables).await,
1084 _ => Err(FraiseQLError::Validation {
1085 message: format!("Unknown federation query: {}", query_name),
1086 path: None,
1087 }),
1088 }
1089 }
1090
1091 async fn execute_service_query(&self) -> Result<String> {
1093 let fed_metadata =
1095 self.schema.federation_metadata().ok_or_else(|| FraiseQLError::Validation {
1096 message: "Federation not enabled in schema".to_string(),
1097 path: None,
1098 })?;
1099
1100 let raw_schema = self.schema.raw_schema();
1102 let sdl = crate::federation::generate_service_sdl(&raw_schema, &fed_metadata);
1103
1104 let response = serde_json::json!({
1106 "data": {
1107 "_service": {
1108 "sdl": sdl
1109 }
1110 }
1111 });
1112
1113 Ok(serde_json::to_string(&response)?)
1114 }
1115
1116 async fn execute_entities_query(
1118 &self,
1119 query: &str,
1120 variables: Option<&serde_json::Value>,
1121 ) -> Result<String> {
1122 let fed_metadata =
1124 self.schema.federation_metadata().ok_or_else(|| FraiseQLError::Validation {
1125 message: "Federation not enabled in schema".to_string(),
1126 path: None,
1127 })?;
1128
1129 let representations_value =
1131 variables.and_then(|v| v.get("representations")).ok_or_else(|| {
1132 FraiseQLError::Validation {
1133 message: "_entities query requires 'representations' variable".to_string(),
1134 path: None,
1135 }
1136 })?;
1137
1138 let representations =
1140 crate::federation::parse_representations(representations_value, &fed_metadata)
1141 .map_err(|e| FraiseQLError::Validation {
1142 message: format!("Failed to parse representations: {}", e),
1143 path: None,
1144 })?;
1145
1146 crate::federation::validate_representations(&representations, &fed_metadata).map_err(
1148 |errors| FraiseQLError::Validation {
1149 message: format!("Invalid representations: {}", errors.join("; ")),
1150 path: None,
1151 },
1152 )?;
1153
1154 let fed_resolver = crate::federation::FederationResolver::new(fed_metadata);
1156
1157 let selection = match crate::federation::selection_parser::parse_field_selection(query) {
1159 Ok(sel) if !sel.fields.is_empty() => {
1160 let mut fields = sel.fields;
1162 if !fields.contains(&"__typename".to_string()) {
1163 fields.push("__typename".to_string());
1164 }
1165 crate::federation::FieldSelection::new(fields)
1166 },
1167 _ => {
1168 crate::federation::FieldSelection::new(vec![
1170 "__typename".to_string(),
1171 "*".to_string(), ])
1173 },
1174 };
1175
1176 let trace_context = crate::federation::FederationTraceContext::new();
1182
1183 let entities = crate::federation::batch_load_entities_with_tracing(
1185 &representations,
1186 &fed_resolver,
1187 Arc::clone(&self.adapter),
1188 &selection,
1189 Some(trace_context),
1190 )
1191 .await?;
1192
1193 let response = serde_json::json!({
1195 "data": {
1196 "_entities": entities
1197 }
1198 });
1199
1200 Ok(serde_json::to_string(&response)?)
1201 }
1202
1203 pub async fn execute_window_query(
1242 &self,
1243 query_json: &serde_json::Value,
1244 query_name: &str,
1245 metadata: &crate::compiler::fact_table::FactTableMetadata,
1246 ) -> Result<String> {
1247 let request = super::WindowQueryParser::parse(query_json, metadata)?;
1249
1250 let plan =
1252 crate::compiler::window_functions::WindowPlanner::plan(request, metadata.clone())?;
1253
1254 let sql_generator = super::WindowSqlGenerator::new(self.adapter.database_type());
1256 let sql = sql_generator.generate(&plan)?;
1257
1258 let rows = self.adapter.execute_raw_query(&sql.complete_sql).await?;
1260
1261 let projected = super::WindowProjector::project(rows, &plan)?;
1263
1264 let response = super::WindowProjector::wrap_in_data_envelope(projected, query_name);
1266
1267 Ok(serde_json::to_string(&response)?)
1269 }
1270
1271 pub async fn execute_json(
1275 &self,
1276 query: &str,
1277 variables: Option<&serde_json::Value>,
1278 ) -> Result<serde_json::Value> {
1279 let result_str = self.execute(query, variables).await?;
1280 Ok(serde_json::from_str(&result_str)?)
1281 }
1282
1283 pub async fn execute_aggregate_query(
1317 &self,
1318 query_json: &serde_json::Value,
1319 query_name: &str,
1320 metadata: &crate::compiler::fact_table::FactTableMetadata,
1321 ) -> Result<String> {
1322 let request = super::AggregateQueryParser::parse(query_json, metadata)?;
1324
1325 let plan =
1327 crate::compiler::aggregation::AggregationPlanner::plan(request, metadata.clone())?;
1328
1329 let sql_generator = super::AggregationSqlGenerator::new(self.adapter.database_type());
1331 let sql = sql_generator.generate(&plan)?;
1332
1333 let rows = self.adapter.execute_raw_query(&sql.complete_sql).await?;
1335
1336 let projected = super::AggregationProjector::project(rows, &plan)?;
1338
1339 let response = super::AggregationProjector::wrap_in_data_envelope(projected, query_name);
1341
1342 Ok(serde_json::to_string(&response)?)
1344 }
1345
1346 #[must_use]
1348 pub const fn schema(&self) -> &CompiledSchema {
1349 &self.schema
1350 }
1351
1352 #[must_use]
1354 pub const fn config(&self) -> &RuntimeConfig {
1355 &self.config
1356 }
1357
1358 #[must_use]
1360 pub fn adapter(&self) -> &Arc<A> {
1361 &self.adapter
1362 }
1363}
1364
1365#[cfg(test)]
1366mod tests {
1367 use async_trait::async_trait;
1368
1369 use super::*;
1370 use crate::{
1371 db::{types::JsonbValue, where_clause::WhereClause},
1372 runtime::JsonbOptimizationOptions,
1373 schema::{AutoParams, CompiledSchema, QueryDefinition},
1374 };
1375
1376 struct MockAdapter {
1378 mock_results: Vec<JsonbValue>,
1379 }
1380
1381 impl MockAdapter {
1382 fn new(mock_results: Vec<JsonbValue>) -> Self {
1383 Self { mock_results }
1384 }
1385 }
1386
1387 #[async_trait]
1388 impl DatabaseAdapter for MockAdapter {
1389 async fn execute_with_projection(
1390 &self,
1391 view: &str,
1392 _projection: Option<&crate::schema::SqlProjectionHint>,
1393 where_clause: Option<&WhereClause>,
1394 limit: Option<u32>,
1395 ) -> Result<Vec<JsonbValue>> {
1396 self.execute_where_query(view, where_clause, limit, None).await
1398 }
1399
1400 async fn execute_where_query(
1401 &self,
1402 _view: &str,
1403 _where_clause: Option<&WhereClause>,
1404 _limit: Option<u32>,
1405 _offset: Option<u32>,
1406 ) -> Result<Vec<JsonbValue>> {
1407 Ok(self.mock_results.clone())
1408 }
1409
1410 async fn health_check(&self) -> Result<()> {
1411 Ok(())
1412 }
1413
1414 fn database_type(&self) -> DatabaseType {
1415 DatabaseType::PostgreSQL
1416 }
1417
1418 fn pool_metrics(&self) -> PoolMetrics {
1419 PoolMetrics {
1420 total_connections: 1,
1421 active_connections: 0,
1422 idle_connections: 1,
1423 waiting_requests: 0,
1424 }
1425 }
1426
1427 async fn execute_raw_query(
1428 &self,
1429 _sql: &str,
1430 ) -> Result<Vec<std::collections::HashMap<String, serde_json::Value>>> {
1431 Ok(vec![])
1433 }
1434
1435 async fn execute_function_call(
1436 &self,
1437 _function_name: &str,
1438 _args: &[serde_json::Value],
1439 ) -> Result<Vec<std::collections::HashMap<String, serde_json::Value>>> {
1440 Ok(vec![])
1441 }
1442 }
1443
1444 fn test_schema() -> CompiledSchema {
1445 let mut schema = CompiledSchema::new();
1446 schema.queries.push(QueryDefinition {
1447 name: "users".to_string(),
1448 return_type: "User".to_string(),
1449 returns_list: true,
1450 nullable: false,
1451 arguments: Vec::new(),
1452 sql_source: Some("v_user".to_string()),
1453 description: None,
1454 auto_params: AutoParams::default(),
1455 deprecation: None,
1456 jsonb_column: "data".to_string(),
1457 });
1458 schema
1459 }
1460
1461 fn mock_user_results() -> Vec<JsonbValue> {
1462 vec![
1463 JsonbValue::new(serde_json::json!({"id": "1", "name": "Alice"})),
1464 JsonbValue::new(serde_json::json!({"id": "2", "name": "Bob"})),
1465 ]
1466 }
1467
1468 #[tokio::test]
1469 async fn test_executor_new() {
1470 let schema = test_schema();
1471 let adapter = Arc::new(MockAdapter::new(vec![]));
1472 let executor = Executor::new(schema, adapter);
1473
1474 assert_eq!(executor.schema().queries.len(), 1);
1475 }
1476
1477 #[tokio::test]
1478 async fn test_execute_query() {
1479 let schema = test_schema();
1480 let adapter = Arc::new(MockAdapter::new(mock_user_results()));
1481 let executor = Executor::new(schema, adapter);
1482
1483 let query = "{ users { id name } }";
1484 let result = executor.execute(query, None).await.unwrap();
1485
1486 assert!(result.contains("\"data\""));
1487 assert!(result.contains("\"users\""));
1488 assert!(result.contains("\"id\""));
1489 assert!(result.contains("\"name\""));
1490 }
1491
1492 #[tokio::test]
1493 async fn test_execute_json() {
1494 let schema = test_schema();
1495 let adapter = Arc::new(MockAdapter::new(mock_user_results()));
1496 let executor = Executor::new(schema, adapter);
1497
1498 let query = "{ users { id name } }";
1499 let result = executor.execute_json(query, None).await.unwrap();
1500
1501 assert!(result.get("data").is_some());
1502 assert!(result["data"].get("users").is_some());
1503 }
1504
1505 #[tokio::test]
1506 async fn test_executor_with_config() {
1507 let schema = test_schema();
1508 let adapter = Arc::new(MockAdapter::new(vec![]));
1509 let config = RuntimeConfig {
1510 cache_query_plans: false,
1511 max_query_depth: 5,
1512 max_query_complexity: 500,
1513 enable_tracing: true,
1514 field_filter: None,
1515 rls_policy: None,
1516 query_timeout_ms: 30_000,
1517 jsonb_optimization: JsonbOptimizationOptions::default(),
1518 };
1519
1520 let executor = Executor::with_config(schema, adapter, config);
1521
1522 assert!(!executor.config().cache_query_plans);
1523 assert_eq!(executor.config().max_query_depth, 5);
1524 assert!(executor.config().enable_tracing);
1525 }
1526
1527 #[tokio::test]
1528 async fn test_introspection_schema_query() {
1529 let schema = test_schema();
1530 let adapter = Arc::new(MockAdapter::new(vec![]));
1531 let executor = Executor::new(schema, adapter);
1532
1533 let query = r"{ __schema { queryType { name } } }";
1534 let result = executor.execute(query, None).await.unwrap();
1535
1536 assert!(result.contains("__schema"));
1537 assert!(result.contains("Query"));
1538 }
1539
1540 #[tokio::test]
1541 async fn test_introspection_type_query() {
1542 let schema = test_schema();
1543 let adapter = Arc::new(MockAdapter::new(vec![]));
1544 let executor = Executor::new(schema, adapter);
1545
1546 let query = r#"{ __type(name: "Int") { kind name } }"#;
1547 let result = executor.execute(query, None).await.unwrap();
1548
1549 assert!(result.contains("__type"));
1550 assert!(result.contains("Int"));
1551 }
1552
1553 #[tokio::test]
1554 async fn test_introspection_unknown_type() {
1555 let schema = test_schema();
1556 let adapter = Arc::new(MockAdapter::new(vec![]));
1557 let executor = Executor::new(schema, adapter);
1558
1559 let query = r#"{ __type(name: "UnknownType") { kind name } }"#;
1560 let result = executor.execute(query, None).await.unwrap();
1561
1562 assert!(result.contains("null"));
1564 }
1565
1566 #[test]
1567 fn test_detect_introspection_schema() {
1568 let schema = test_schema();
1569 let adapter = Arc::new(MockAdapter::new(vec![]));
1570 let executor = Executor::new(schema, adapter);
1571
1572 let query = r"{ __schema { types { name } } }";
1573 let query_type = executor.classify_query(query).unwrap();
1574 assert_eq!(query_type, QueryType::IntrospectionSchema);
1575 }
1576
1577 #[test]
1578 fn test_detect_introspection_type() {
1579 let schema = test_schema();
1580 let adapter = Arc::new(MockAdapter::new(vec![]));
1581 let executor = Executor::new(schema, adapter);
1582
1583 let query = r#"{ __type(name: "User") { fields { name } } }"#;
1584 let query_type = executor.classify_query(query).unwrap();
1585 assert_eq!(query_type, QueryType::IntrospectionType("User".to_string()));
1586 }
1587
1588 #[test]
1589 fn test_extract_type_argument() {
1590 let schema = test_schema();
1591 let adapter = Arc::new(MockAdapter::new(vec![]));
1592 let executor = Executor::new(schema, adapter);
1593
1594 let query1 = r#"{ __type(name: "User") { name } }"#;
1596 assert_eq!(executor.extract_type_argument(query1), Some("User".to_string()));
1597
1598 let query2 = r"{ __type(name: 'Product') { name } }";
1600 assert_eq!(executor.extract_type_argument(query2), Some("Product".to_string()));
1601
1602 let query3 = r#"{ __type(name:"Query") { name } }"#;
1604 assert_eq!(executor.extract_type_argument(query3), Some("Query".to_string()));
1605 }
1606
1607 #[test]
1610 fn test_execution_context_creation() {
1611 let ctx = ExecutionContext::new("query-123".to_string());
1612 assert_eq!(ctx.query_id(), "query-123");
1613 assert!(!ctx.is_cancelled());
1614 }
1615
1616 #[test]
1617 fn test_execution_context_cancellation_token() {
1618 let ctx = ExecutionContext::new("query-456".to_string());
1619 let token = ctx.cancellation_token();
1620 assert!(!token.is_cancelled());
1621
1622 token.cancel();
1624 assert!(token.is_cancelled());
1625 assert!(ctx.is_cancelled());
1626 }
1627
1628 #[tokio::test]
1629 async fn test_execute_with_context_success() {
1630 let schema = test_schema();
1631 let adapter = Arc::new(MockAdapter::new(vec![]));
1632 let executor = Executor::new(schema, adapter);
1633
1634 let ctx = ExecutionContext::new("test-query-1".to_string());
1635 let query = r"{ __schema { queryType { name } } }";
1636
1637 let result = executor.execute_with_context(query, None, &ctx).await;
1638 assert!(result.is_ok());
1639 assert!(result.unwrap().contains("__schema"));
1640 }
1641
1642 #[tokio::test]
1643 async fn test_execute_with_context_already_cancelled() {
1644 let schema = test_schema();
1645 let adapter = Arc::new(MockAdapter::new(vec![]));
1646 let executor = Executor::new(schema, adapter);
1647
1648 let ctx = ExecutionContext::new("test-query-2".to_string());
1649 let token = ctx.cancellation_token().clone();
1650
1651 token.cancel();
1653
1654 let query = r"{ __schema { queryType { name } } }";
1655 let result = executor.execute_with_context(query, None, &ctx).await;
1656
1657 assert!(result.is_err());
1658 match result.unwrap_err() {
1659 FraiseQLError::Cancelled { query_id, reason } => {
1660 assert_eq!(query_id, "test-query-2");
1661 assert!(reason.contains("before execution"));
1662 },
1663 e => panic!("Expected Cancelled error, got: {}", e),
1664 }
1665 }
1666
1667 #[tokio::test]
1668 async fn test_execute_with_context_cancelled_during_execution() {
1669 let schema = test_schema();
1670 let adapter = Arc::new(MockAdapter::new(vec![]));
1671 let executor = Executor::new(schema, adapter);
1672
1673 let ctx = ExecutionContext::new("test-query-3".to_string());
1674 let token = ctx.cancellation_token().clone();
1675
1676 tokio::spawn(async move {
1678 tokio::time::sleep(Duration::from_millis(10)).await;
1679 token.cancel();
1680 });
1681
1682 let query = r"{ __schema { queryType { name } } }";
1683 let result = executor.execute_with_context(query, None, &ctx).await;
1684
1685 if let Err(FraiseQLError::Cancelled { query_id, .. }) = result {
1688 assert_eq!(query_id, "test-query-3");
1689 }
1690 }
1691
1692 #[test]
1693 fn test_execution_context_clone() {
1694 let ctx = ExecutionContext::new("query-clone".to_string());
1695 let ctx_clone = ctx.clone();
1696
1697 assert_eq!(ctx.query_id(), ctx_clone.query_id());
1698 assert!(!ctx_clone.is_cancelled());
1699
1700 ctx.cancellation_token().cancel();
1702
1703 assert!(ctx_clone.is_cancelled());
1705 }
1706
1707 #[test]
1708 fn test_error_cancelled_constructor() {
1709 let err = FraiseQLError::cancelled("query-001", "user requested cancellation");
1710
1711 assert!(err.to_string().contains("Query cancelled"));
1712 assert_eq!(err.status_code(), 408);
1713 assert_eq!(err.error_code(), "CANCELLED");
1714 assert!(err.is_retryable());
1715 assert!(err.is_server_error());
1716 }
1717
1718 #[test]
1723 fn test_jsonb_strategy_in_runtime_config() {
1724 let config = RuntimeConfig {
1726 cache_query_plans: false,
1727 max_query_depth: 5,
1728 max_query_complexity: 500,
1729 enable_tracing: true,
1730 field_filter: None,
1731 rls_policy: None,
1732 query_timeout_ms: 30_000,
1733 jsonb_optimization: JsonbOptimizationOptions::default(),
1734 };
1735
1736 assert_eq!(config.jsonb_optimization.default_strategy, JsonbStrategy::Project);
1737 assert_eq!(config.jsonb_optimization.auto_threshold_percent, 80);
1738 }
1739
1740 #[test]
1741 fn test_jsonb_strategy_custom_config() {
1742 let custom_options = JsonbOptimizationOptions {
1744 default_strategy: JsonbStrategy::Stream,
1745 auto_threshold_percent: 50,
1746 };
1747
1748 let config = RuntimeConfig {
1749 cache_query_plans: false,
1750 max_query_depth: 5,
1751 max_query_complexity: 500,
1752 enable_tracing: true,
1753 field_filter: None,
1754 rls_policy: None,
1755 query_timeout_ms: 30_000,
1756 jsonb_optimization: custom_options,
1757 };
1758
1759 assert_eq!(config.jsonb_optimization.default_strategy, JsonbStrategy::Stream);
1760 assert_eq!(config.jsonb_optimization.auto_threshold_percent, 50);
1761 }
1762}