1use std::cell::Cell;
2use std::num::NonZeroU32;
3use std::ops::ControlFlow;
4use std::sync::Arc;
5
6use apollo_compiler::ExecutableDocument;
7use apollo_compiler::Name;
8use apollo_compiler::collections::IndexMap;
9use apollo_compiler::collections::IndexSet;
10use apollo_compiler::validation::Valid;
11use itertools::Itertools;
12use petgraph::visit::EdgeRef;
13use serde::Deserialize;
14use serde::Serialize;
15use tracing::trace;
16
17use super::ConditionNode;
18use super::QueryPlanCost;
19use super::fetch_dependency_graph::FetchIdGenerator;
20use crate::ApiSchemaOptions;
21use crate::Supergraph;
22use crate::bail;
23use crate::error::FederationError;
24use crate::error::SingleFederationError;
25use crate::internal_error;
26use crate::operation::NormalizedDefer;
27use crate::operation::Operation;
28use crate::operation::SelectionSet;
29use crate::operation::normalize_operation;
30use crate::query_graph::OverrideConditions;
31use crate::query_graph::QueryGraph;
32use crate::query_graph::QueryGraphNodeType;
33use crate::query_graph::build_federated_query_graph;
34use crate::query_graph::path_tree::OpPathTree;
35use crate::query_plan::PlanNode;
36use crate::query_plan::QueryPlan;
37use crate::query_plan::SequenceNode;
38use crate::query_plan::TopLevelPlanNode;
39use crate::query_plan::fetch_dependency_graph::FetchDependencyGraph;
40use crate::query_plan::fetch_dependency_graph::FetchDependencyGraphNodePath;
41use crate::query_plan::fetch_dependency_graph::compute_nodes_for_tree;
42use crate::query_plan::fetch_dependency_graph_processor::FetchDependencyGraphProcessor;
43use crate::query_plan::fetch_dependency_graph_processor::FetchDependencyGraphToCostProcessor;
44use crate::query_plan::fetch_dependency_graph_processor::FetchDependencyGraphToQueryPlanProcessor;
45use crate::query_plan::query_planning_traversal::BestQueryPlanInfo;
46use crate::query_plan::query_planning_traversal::QueryPlanningParameters;
47use crate::query_plan::query_planning_traversal::QueryPlanningTraversal;
48use crate::query_plan::query_planning_traversal::convert_type_from_subgraph;
49use crate::query_plan::query_planning_traversal::non_local_selections_estimation;
50use crate::schema::ValidFederationSchema;
51use crate::schema::position::AbstractTypeDefinitionPosition;
52use crate::schema::position::CompositeTypeDefinitionPosition;
53use crate::schema::position::InterfaceTypeDefinitionPosition;
54use crate::schema::position::ObjectTypeDefinitionPosition;
55use crate::schema::position::OutputTypeDefinitionPosition;
56use crate::schema::position::SchemaRootDefinitionKind;
57use crate::schema::position::TypeDefinitionPosition;
58use crate::utils::logging::snapshot;
59
60#[derive(Debug, Clone, Hash, Serialize)]
61pub struct QueryPlannerConfig {
62 pub generate_query_fragments: bool,
67
68 pub subgraph_graphql_validation: bool,
76
77 pub incremental_delivery: QueryPlanIncrementalDeliveryConfig,
83
84 pub debug: QueryPlannerDebugConfig,
88
89 pub type_conditioned_fetching: bool,
96}
97
98#[allow(clippy::derivable_impls)] impl Default for QueryPlannerConfig {
100 fn default() -> Self {
101 Self {
102 generate_query_fragments: false,
103 subgraph_graphql_validation: false,
104 incremental_delivery: Default::default(),
105 debug: Default::default(),
106 type_conditioned_fetching: false,
107 }
108 }
109}
110
111#[derive(Debug, Clone, Default, Hash, Serialize)]
112pub struct QueryPlanIncrementalDeliveryConfig {
113 #[serde(default)]
123 pub enable_defer: bool,
124}
125
126#[derive(Debug, Clone, Hash, Serialize)]
127pub struct QueryPlannerDebugConfig {
128 pub max_evaluated_plans: NonZeroU32,
146
147 pub paths_limit: Option<u32>,
160}
161
162impl Default for QueryPlannerDebugConfig {
163 fn default() -> Self {
164 Self {
165 max_evaluated_plans: NonZeroU32::new(10_000).unwrap(),
166 paths_limit: None,
167 }
168 }
169}
170
171#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize)]
173pub struct QueryPlanningStatistics {
174 pub evaluated_plan_count: Cell<usize>,
175 pub evaluated_plan_paths: Cell<usize>,
176 #[serde(deserialize_with = "deserialize_f64_nullable")]
178 pub best_plan_cost: f64,
179}
180
181fn deserialize_f64_nullable<'de, D>(deserializer: D) -> Result<f64, D::Error>
184where
185 D: serde::de::Deserializer<'de>,
186{
187 let opt = Option::<f64>::deserialize(deserializer)?;
189 Ok(opt.unwrap_or(f64::NAN))
191}
192
193#[derive(Clone)]
194pub struct QueryPlanOptions<'a> {
195 pub override_conditions: Vec<String>,
202 pub check_for_cooperative_cancellation: Option<&'a dyn Fn() -> ControlFlow<()>>,
211 pub non_local_selections_limit_enabled: bool,
214 pub disabled_subgraph_names: IndexSet<String>,
219}
220
221impl Default for QueryPlanOptions<'_> {
222 fn default() -> Self {
223 Self {
224 override_conditions: Vec::new(),
225 check_for_cooperative_cancellation: None,
226 non_local_selections_limit_enabled: true,
227 disabled_subgraph_names: Default::default(),
228 }
229 }
230}
231
232impl std::fmt::Debug for QueryPlanOptions<'_> {
233 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
234 f.debug_struct("QueryPlanOptions")
235 .field("override_conditions", &self.override_conditions)
236 .field(
237 "check_for_cooperative_cancellation",
238 if self.check_for_cooperative_cancellation.is_some() {
239 &"Some(...)"
240 } else {
241 &"None"
242 },
243 )
244 .field(
245 "non_local_selections_limit_enabled",
246 &self.non_local_selections_limit_enabled,
247 )
248 .finish()
249 }
250}
251
252pub struct QueryPlanner {
253 config: QueryPlannerConfig,
254 federated_query_graph: Arc<QueryGraph>,
255 supergraph_schema: ValidFederationSchema,
256 api_schema: ValidFederationSchema,
257 interface_types_with_interface_objects: IndexSet<InterfaceTypeDefinitionPosition>,
260 abstract_types_with_inconsistent_runtime_types: IndexSet<Name>,
265}
266
267impl QueryPlanner {
268 #[cfg_attr(
269 feature = "snapshot_tracing",
270 tracing::instrument(level = "trace", skip_all, name = "QueryPlanner::new")
271 )]
272 pub fn new(
273 supergraph: &Supergraph,
274 config: QueryPlannerConfig,
275 ) -> Result<Self, FederationError> {
276 let supergraph_schema = supergraph.schema.clone();
277 let api_schema = supergraph.to_api_schema(ApiSchemaOptions {
278 include_defer: config.incremental_delivery.enable_defer,
279 ..Default::default()
280 })?;
281 let query_graph = build_federated_query_graph(
282 supergraph_schema.clone(),
283 api_schema.clone(),
284 Some(true),
285 Some(true),
286 )?;
287
288 let interface_types_with_interface_objects = supergraph
289 .schema
290 .get_types()
291 .filter_map(|position| match position {
292 TypeDefinitionPosition::Interface(interface_position) => Some(interface_position),
293 _ => None,
294 })
295 .map(|position| {
296 let is_interface_object = query_graph
297 .subgraphs()
298 .map(|(_name, schema)| {
299 let Some(position) = schema.try_get_type(position.type_name.clone()) else {
300 return Ok(false);
301 };
302 schema.is_interface_object_type(position)
303 })
304 .process_results(|mut iter| iter.any(|b| b))?;
305 Ok::<_, FederationError>((position, is_interface_object))
306 })
307 .process_results(|iter| {
308 iter.flat_map(|(position, is_interface_object)| {
309 if is_interface_object {
310 Some(position)
311 } else {
312 None
313 }
314 })
315 .collect::<IndexSet<_>>()
316 })?;
317
318 let is_inconsistent = |position: AbstractTypeDefinitionPosition| {
319 let mut sources = query_graph.subgraphs().filter_map(|(_name, subgraph)| {
320 match subgraph.try_get_type(position.type_name().clone())? {
321 TypeDefinitionPosition::Object(_) => None,
326 TypeDefinitionPosition::Interface(interface) => Some(
327 subgraph
328 .referencers()
329 .get_interface_type(&interface.type_name)
330 .ok()?
331 .object_types
332 .clone(),
333 ),
334 TypeDefinitionPosition::Union(union_) => Some(
335 union_
336 .try_get(subgraph.schema())?
337 .members
338 .iter()
339 .map(|member| ObjectTypeDefinitionPosition::new(member.name.clone()))
340 .collect(),
341 ),
342 _ => None,
343 }
344 });
345
346 let Some(expected_runtimes) = sources.next() else {
347 return false;
348 };
349 !sources.all(|runtimes| runtimes == expected_runtimes)
350 };
351
352 let abstract_types_with_inconsistent_runtime_types = supergraph
353 .schema
354 .get_types()
355 .filter_map(|position| AbstractTypeDefinitionPosition::try_from(position).ok())
356 .filter(|position| is_inconsistent(position.clone()))
357 .map(|position| position.type_name().clone())
358 .collect::<IndexSet<_>>();
359
360 Ok(Self {
361 config,
362 federated_query_graph: Arc::new(query_graph),
363 supergraph_schema,
364 api_schema,
365 interface_types_with_interface_objects,
366 abstract_types_with_inconsistent_runtime_types,
367 })
368 }
369
370 pub fn subgraph_schemas(&self) -> &IndexMap<Arc<str>, ValidFederationSchema> {
371 self.federated_query_graph.subgraph_schemas()
372 }
373
374 #[cfg_attr(
376 feature = "snapshot_tracing",
377 tracing::instrument(level = "trace", skip_all, name = "QueryPlanner::build_query_plan")
378 )]
379 pub fn build_query_plan(
380 &self,
381 document: &Valid<ExecutableDocument>,
382 operation_name: Option<Name>,
383 options: QueryPlanOptions,
384 ) -> Result<QueryPlan, FederationError> {
385 let operation = document
386 .operations
387 .get(operation_name.as_ref().map(|name| name.as_str()))
388 .map_err(|_| {
389 if operation_name.is_some() {
390 SingleFederationError::UnknownOperation
391 } else {
392 SingleFederationError::OperationNameNotProvided
393 }
394 })?;
395 if operation.selection_set.is_empty() {
396 crate::bail!("Invalid operation: empty selection set")
398 }
399
400 let is_subscription = operation.is_subscription();
401
402 let statistics = QueryPlanningStatistics::default();
403
404 let normalized_operation = normalize_operation(
405 operation,
406 &document.fragments,
407 &self.api_schema,
408 &self.interface_types_with_interface_objects,
409 &|| {
410 QueryPlanningParameters::check_cancellation_with(
411 &options.check_for_cooperative_cancellation,
412 )
413 },
414 )?;
415
416 let NormalizedDefer {
417 operation: normalized_operation,
418 assigned_defer_labels,
419 defer_conditions,
420 has_defers,
421 } = normalized_operation.with_normalized_defer()?;
422 if has_defers && is_subscription {
423 return Err(SingleFederationError::DeferredSubscriptionUnsupported.into());
424 }
425
426 if normalized_operation.selection_set.is_empty() {
427 return Ok(QueryPlan::default());
428 }
429
430 snapshot!(
431 "NormalizedOperation",
432 serde_json_bytes::json!({
433 "original": &operation.serialize().to_string(),
434 "normalized": &normalized_operation.to_string()
435 })
436 .to_string(),
437 "normalized operation"
438 );
439
440 let Some(root) = self
441 .federated_query_graph
442 .root_kinds_to_nodes()?
443 .get(&normalized_operation.root_kind)
444 else {
445 bail!(
446 "Shouldn't have a {0} operation if the subgraphs don't have a {0} root",
447 normalized_operation.root_kind
448 )
449 };
450
451 let operation_compression = if self.config.generate_query_fragments {
452 SubgraphOperationCompression::GenerateFragments
453 } else {
454 SubgraphOperationCompression::Disabled
455 };
456 let mut processor = FetchDependencyGraphToQueryPlanProcessor::new(
457 normalized_operation.variables.clone(),
458 normalized_operation.directives.clone(),
459 operation_compression,
460 operation.name.clone(),
461 assigned_defer_labels,
462 );
463 let mut parameters = QueryPlanningParameters {
464 supergraph_schema: self.supergraph_schema.clone(),
465 federated_query_graph: self.federated_query_graph.clone(),
466 operation: Arc::new(normalized_operation),
467 head: *root,
468 head_must_be_root: true,
471 statistics: &statistics,
472 abstract_types_with_inconsistent_runtime_types: self
473 .abstract_types_with_inconsistent_runtime_types
474 .clone()
475 .into(),
476 config: self.config.clone(),
477 override_conditions: OverrideConditions::new(
478 &self.federated_query_graph,
479 &IndexSet::from_iter(options.override_conditions),
480 ),
481 check_for_cooperative_cancellation: options.check_for_cooperative_cancellation,
482 fetch_id_generator: Arc::new(FetchIdGenerator::new()),
483 disabled_subgraphs: self
484 .federated_query_graph
485 .subgraphs()
486 .filter_map(|(subgraph, _)| {
487 if options.disabled_subgraph_names.contains(subgraph.as_ref()) {
488 Some(subgraph.clone())
489 } else {
490 None
491 }
492 })
493 .collect(),
494 };
495
496 let mut non_local_selection_state = options
497 .non_local_selections_limit_enabled
498 .then(non_local_selections_estimation::State::default);
499 let (root_node, cost) = if !defer_conditions.is_empty() {
500 compute_plan_for_defer_conditionals(
501 &mut parameters,
502 &mut processor,
503 defer_conditions,
504 &mut non_local_selection_state,
505 )
506 } else {
507 compute_plan_internal(
508 &mut parameters,
509 &mut processor,
510 has_defers,
511 &mut non_local_selection_state,
512 )
513 }?;
514
515 let root_node = match root_node {
516 Some(PlanNode::Fetch(root_node)) if is_subscription => Some(
520 TopLevelPlanNode::Subscription(crate::query_plan::SubscriptionNode {
521 primary: root_node,
522 rest: None,
523 }),
524 ),
525 Some(PlanNode::Sequence(root_node)) if is_subscription => {
526 let Some((primary, rest)) = root_node.nodes.split_first() else {
527 bail!("Invalid query plan: Sequence must have at least one node");
529 };
530 let PlanNode::Fetch(primary) = primary.clone() else {
531 bail!("Invalid query plan: Primary node of a subscription is not a Fetch");
532 };
533 let rest = PlanNode::Sequence(SequenceNode {
534 nodes: rest.to_vec(),
535 });
536 Some(TopLevelPlanNode::Subscription(
537 crate::query_plan::SubscriptionNode {
538 primary,
539 rest: Some(Box::new(rest)),
540 },
541 ))
542 }
543 Some(node) if is_subscription => {
544 bail!(
545 "Invalid query plan for subscription: unexpected {} at root",
546 node.node_kind()
547 );
548 }
549 Some(PlanNode::Fetch(inner)) => Some(TopLevelPlanNode::Fetch(inner)),
550 Some(PlanNode::Sequence(inner)) => Some(TopLevelPlanNode::Sequence(inner)),
551 Some(PlanNode::Parallel(inner)) => Some(TopLevelPlanNode::Parallel(inner)),
552 Some(PlanNode::Flatten(inner)) => Some(TopLevelPlanNode::Flatten(inner)),
553 Some(PlanNode::Defer(inner)) => Some(TopLevelPlanNode::Defer(inner)),
554 Some(PlanNode::Condition(inner)) => Some(TopLevelPlanNode::Condition(inner)),
555 None => None,
556 };
557
558 let plan = QueryPlan {
559 node: root_node,
560 statistics: QueryPlanningStatistics {
561 best_plan_cost: cost,
562 ..statistics
563 },
564 };
565
566 snapshot!(
567 "QueryPlan",
568 plan.to_string(),
569 "QueryPlan from build_query_plan"
570 );
571 snapshot!(
572 plan.statistics,
573 "QueryPlanningStatistics from build_query_plan"
574 );
575
576 Ok(plan)
577 }
578
579 pub fn api_schema(&self) -> &ValidFederationSchema {
581 &self.api_schema
582 }
583
584 pub fn supergraph_schema(&self) -> &ValidFederationSchema {
585 &self.supergraph_schema
586 }
587
588 pub fn override_condition_labels(&self) -> &IndexSet<Arc<str>> {
589 self.federated_query_graph.override_condition_labels()
590 }
591}
592
593fn compute_root_serial_dependency_graph_for_mutation(
594 parameters: &QueryPlanningParameters,
595 has_defers: bool,
596 non_local_selection_state: &mut Option<non_local_selections_estimation::State>,
597) -> Result<Vec<FetchDependencyGraph>, FederationError> {
598 let QueryPlanningParameters {
599 supergraph_schema,
600 federated_query_graph,
601 operation,
602 ..
603 } = parameters;
604 let root_type: Option<CompositeTypeDefinitionPosition> = if has_defers {
605 supergraph_schema
606 .schema()
607 .root_operation(operation.root_kind.into())
608 .and_then(|name| supergraph_schema.get_type(name.clone()).ok())
609 .and_then(|ty| ty.try_into().ok())
610 } else {
611 None
612 };
613 let mut split_roots = operation.selection_set.clone().split_top_level_fields();
615 let mut digest = Vec::new();
616 let selection_set = split_roots
617 .next()
618 .ok_or_else(|| FederationError::internal("Empty top level fields"))?;
619 let BestQueryPlanInfo {
620 mut fetch_dependency_graph,
621 path_tree: mut prev_path,
622 ..
623 } = compute_root_parallel_best_plan_for_mutation(
624 parameters,
625 selection_set,
626 has_defers,
627 non_local_selection_state,
628 )?;
629 let mut prev_subgraph = only_root_subgraph(&fetch_dependency_graph)?;
630 for selection_set in split_roots {
631 let BestQueryPlanInfo {
632 fetch_dependency_graph: new_dep_graph,
633 path_tree: new_path,
634 ..
635 } = compute_root_parallel_best_plan_for_mutation(
636 parameters,
637 selection_set,
638 has_defers,
639 non_local_selection_state,
640 )?;
641 let new_subgraph = only_root_subgraph(&new_dep_graph)?;
642 if new_subgraph == prev_subgraph {
643 Arc::make_mut(&mut prev_path).extend(&new_path);
653 fetch_dependency_graph = FetchDependencyGraph::new(
654 supergraph_schema.clone(),
655 federated_query_graph.clone(),
656 root_type.clone(),
657 fetch_dependency_graph.fetch_id_generation.clone(),
658 );
659 compute_root_fetch_groups(
660 operation.root_kind,
661 federated_query_graph,
662 &mut fetch_dependency_graph,
663 &prev_path,
664 parameters.config.type_conditioned_fetching,
665 &|| parameters.check_cancellation(),
666 )?;
667 } else {
668 digest.push(std::mem::replace(
673 &mut fetch_dependency_graph,
674 new_dep_graph,
675 ));
676 prev_path = new_path;
677 prev_subgraph = new_subgraph;
678 }
679 }
680 digest.push(fetch_dependency_graph);
681 Ok(digest)
682}
683
684fn only_root_subgraph(graph: &FetchDependencyGraph) -> Result<Arc<str>, FederationError> {
685 let mut iter = graph.root_node_by_subgraph_iter();
686 let (Some((name, _)), None) = (iter.next(), iter.next()) else {
687 return Err(FederationError::internal(format!(
688 "{graph} should have only one root."
689 )));
690 };
691 Ok(name.clone())
692}
693
694#[cfg_attr(
695 feature = "snapshot_tracing",
696 tracing::instrument(level = "trace", skip_all, name = "compute_root_fetch_groups")
697)]
698pub(crate) fn compute_root_fetch_groups(
699 root_kind: SchemaRootDefinitionKind,
700 federated_query_graph: &QueryGraph,
701 dependency_graph: &mut FetchDependencyGraph,
702 path: &OpPathTree,
703 type_conditioned_fetching_enabled: bool,
704 check_cancellation: &dyn Fn() -> Result<(), SingleFederationError>,
705) -> Result<(), FederationError> {
706 for child in &path.childs {
713 let edge = child.edge.expect("The root edge should not be None");
714 let (_source_node, target_node) = path.graph.edge_endpoints(edge)?;
715 let target_node = path.graph.node_weight(target_node)?;
716 let subgraph_name = &target_node.source;
717 let root_type: CompositeTypeDefinitionPosition = match &target_node.type_ {
718 QueryGraphNodeType::SchemaType(OutputTypeDefinitionPosition::Object(object)) => {
719 object.clone().into()
720 }
721 ty => {
722 return Err(FederationError::internal(format!(
723 "expected an object type for the root of a subgraph, found {ty}"
724 )));
725 }
726 };
727 let fetch_dependency_node = dependency_graph.get_or_create_root_node(
728 subgraph_name,
729 root_kind,
730 root_type.clone(),
731 )?;
732 snapshot!(
733 "FetchDependencyGraph",
734 dependency_graph.to_dot(),
735 "tree_with_root_node"
736 );
737 let subgraph_schema = federated_query_graph.schema_by_source(subgraph_name)?;
738 let supergraph_root_type = convert_type_from_subgraph(
739 root_type,
740 subgraph_schema,
741 &dependency_graph.supergraph_schema,
742 )?;
743 compute_nodes_for_tree(
744 dependency_graph,
745 &child.tree,
746 fetch_dependency_node,
747 FetchDependencyGraphNodePath::new(
748 dependency_graph.supergraph_schema.clone(),
749 type_conditioned_fetching_enabled,
750 supergraph_root_type,
751 )?,
752 Default::default(),
753 &Default::default(),
754 check_cancellation,
755 )?;
756 }
757 Ok(())
758}
759
760fn compute_root_parallel_dependency_graph(
761 parameters: &QueryPlanningParameters,
762 has_defers: bool,
763 non_local_selection_state: &mut Option<non_local_selections_estimation::State>,
764) -> Result<(FetchDependencyGraph, QueryPlanCost), FederationError> {
765 trace!("Starting process to construct a parallel fetch dependency graph");
766 let selection_set = parameters.operation.selection_set.clone();
767 let best_plan = compute_root_parallel_best_plan(
768 parameters,
769 selection_set,
770 has_defers,
771 non_local_selection_state,
772 )?;
773 snapshot!(
774 "FetchDependencyGraph",
775 best_plan.fetch_dependency_graph.to_dot(),
776 "Fetch dependency graph returned from compute_root_parallel_best_plan"
777 );
778 Ok((best_plan.fetch_dependency_graph, best_plan.cost))
779}
780
781fn compute_root_parallel_best_plan(
782 parameters: &QueryPlanningParameters,
783 selection: SelectionSet,
784 has_defers: bool,
785 non_local_selection_state: &mut Option<non_local_selections_estimation::State>,
786) -> Result<BestQueryPlanInfo, FederationError> {
787 let planning_traversal = QueryPlanningTraversal::new(
788 parameters,
789 selection,
790 has_defers,
791 parameters.operation.root_kind,
792 FetchDependencyGraphToCostProcessor,
793 non_local_selection_state.as_mut(),
794 None,
795 )?;
796
797 Ok(planning_traversal
800 .find_best_plan()?
801 .unwrap_or_else(|| BestQueryPlanInfo::empty(parameters)))
802}
803
804fn compute_root_parallel_best_plan_for_mutation(
805 parameters: &QueryPlanningParameters,
806 selection: SelectionSet,
807 has_defers: bool,
808 non_local_selection_state: &mut Option<non_local_selections_estimation::State>,
809) -> Result<BestQueryPlanInfo, FederationError> {
810 parameters.federated_query_graph.out_edges(parameters.head).into_iter().map(|edge_ref| {
811 let mutation_subgraph = parameters.federated_query_graph.node_weight(edge_ref.target())?.source.clone();
812 let planning_traversal = QueryPlanningTraversal::new(
813 parameters,
814 selection.clone(),
815 has_defers,
816 parameters.operation.root_kind,
817 FetchDependencyGraphToCostProcessor,
818 non_local_selection_state.as_mut(),
819 Some(mutation_subgraph),
820 )?;
821 planning_traversal.find_best_plan()
822 }).process_results(|iter| iter
823 .flatten()
824 .min_by(|a, b| a.cost.total_cmp(&b.cost))
825 .map(Ok)
826 .unwrap_or_else(|| {
827 if parameters.disabled_subgraphs.is_empty() {
828 Err(FederationError::internal(format!(
829 "Was not able to plan {} starting from a single subgraph: This shouldn't have happened.",
830 parameters.operation,
831 )))
832 } else {
833 Err(SingleFederationError::NoPlanFoundWithDisabledSubgraphs.into())
836 }
837 })
838 )?
839}
840
841fn compute_plan_internal(
842 parameters: &mut QueryPlanningParameters,
843 processor: &mut FetchDependencyGraphToQueryPlanProcessor,
844 has_defers: bool,
845 non_local_selection_state: &mut Option<non_local_selections_estimation::State>,
846) -> Result<(Option<PlanNode>, QueryPlanCost), FederationError> {
847 let root_kind = parameters.operation.root_kind;
848
849 let (main, deferred, primary_selection, cost) = if root_kind
850 == SchemaRootDefinitionKind::Mutation
851 {
852 let dependency_graphs = compute_root_serial_dependency_graph_for_mutation(
853 parameters,
854 has_defers,
855 non_local_selection_state,
856 )?;
857 let mut main = None;
858 let mut deferred = vec![];
859 let mut primary_selection = None::<SelectionSet>;
860 for mut dependency_graph in dependency_graphs {
861 let (local_main, local_deferred) =
862 dependency_graph.process(&mut *processor, root_kind)?;
863 main = match main {
864 Some(unlocal_main) => processor.reduce_sequence([Some(unlocal_main), local_main]),
865 None => local_main,
866 };
867 deferred.extend(local_deferred);
868 let new_selection = dependency_graph.defer_tracking.primary_selection;
869 match primary_selection.as_mut() {
870 Some(selection) => {
871 if let Some(new_selection) = new_selection {
872 selection.add_local_selection_set(&new_selection)?
873 }
874 }
875 None => primary_selection = new_selection,
876 }
877 }
878 (main, deferred, primary_selection, f64::NAN)
880 } else {
881 let (mut dependency_graph, cost) = compute_root_parallel_dependency_graph(
882 parameters,
883 has_defers,
884 non_local_selection_state,
885 )?;
886
887 let (main, deferred) = dependency_graph.process(&mut *processor, root_kind)?;
888 snapshot!(
889 "FetchDependencyGraph",
890 dependency_graph.to_dot(),
891 "Plan after calling FetchDependencyGraph::process"
892 );
893 let primary_selection = dependency_graph.defer_tracking.primary_selection;
895
896 (main, deferred, primary_selection, cost)
897 };
898
899 if deferred.is_empty() {
900 Ok((main, cost))
901 } else {
902 let Some(primary_selection) = primary_selection else {
903 unreachable!("Should have had a primary selection created");
904 };
905 let reduced_main = processor.reduce_defer(main, &primary_selection, deferred)?;
906 Ok((reduced_main, cost))
907 }
908}
909
910fn compute_plan_for_defer_conditionals(
911 parameters: &mut QueryPlanningParameters,
912 processor: &mut FetchDependencyGraphToQueryPlanProcessor,
913 defer_conditions: IndexMap<Name, IndexSet<String>>,
914 non_local_selection_state: &mut Option<non_local_selections_estimation::State>,
915) -> Result<(Option<PlanNode>, QueryPlanCost), FederationError> {
916 generate_condition_nodes(
917 parameters.operation.clone(),
918 defer_conditions.iter(),
919 &mut |op| {
920 parameters.operation = op;
921 compute_plan_internal(parameters, processor, true, non_local_selection_state)
922 },
923 )
924}
925
926fn generate_condition_nodes<'a>(
927 op: Arc<Operation>,
928 mut conditions: impl Clone + Iterator<Item = (&'a Name, &'a IndexSet<String>)>,
929 on_final_operation: &mut impl FnMut(
930 Arc<Operation>,
931 ) -> Result<(Option<PlanNode>, f64), FederationError>,
932) -> Result<(Option<PlanNode>, f64), FederationError> {
933 match conditions.next() {
934 None => on_final_operation(op),
935 Some((cond, labels)) => {
936 let else_op = Arc::unwrap_or_clone(op.clone()).reduce_defer(labels)?;
937 let if_op = op;
938 let (if_node, if_cost) =
939 generate_condition_nodes(if_op, conditions.clone(), on_final_operation)?;
940 let (else_node, else_cost) = generate_condition_nodes(
941 Arc::new(else_op),
942 conditions.clone(),
943 on_final_operation,
944 )?;
945 let node = ConditionNode {
946 condition_variable: cond.clone(),
947 if_clause: if_node.map(Box::new),
948 else_clause: else_node.map(Box::new),
949 };
950 Ok((
951 Some(PlanNode::Condition(Box::new(node))),
952 if_cost.max(else_cost),
953 ))
954 }
955 }
956}
957
958pub(crate) enum SubgraphOperationCompression {
959 GenerateFragments,
960 Disabled,
961}
962
963impl SubgraphOperationCompression {
964 pub(crate) fn compress(
966 &mut self,
967 operation: Operation,
968 ) -> Result<Valid<ExecutableDocument>, FederationError> {
969 match self {
970 Self::GenerateFragments => Ok(operation.generate_fragments()?),
971 Self::Disabled => {
972 let operation_document = operation.try_into().map_err(|err: FederationError| {
973 if err.has_invalid_graphql_error() {
974 internal_error!(
975 "Query planning produced an invalid subgraph operation.\n{err}"
976 )
977 } else {
978 err
979 }
980 })?;
981 Ok(operation_document)
982 }
983 }
984 }
985}
986
987#[cfg(test)]
988mod tests {
989 use super::*;
990
991 const TEST_SUPERGRAPH: &str = r#"
992schema
993 @link(url: "https://specs.apollo.dev/link/v1.0")
994 @link(url: "https://specs.apollo.dev/join/v0.2", for: EXECUTION)
995{
996 query: Query
997}
998
999directive @join__field(graph: join__Graph!, requires: join__FieldSet, provides: join__FieldSet, type: String, external: Boolean, override: String, usedOverridden: Boolean) repeatable on FIELD_DEFINITION | INPUT_FIELD_DEFINITION
1000
1001directive @join__graph(name: String!, url: String!) on ENUM_VALUE
1002
1003directive @join__implements(graph: join__Graph!, interface: String!) repeatable on OBJECT | INTERFACE
1004
1005directive @join__type(graph: join__Graph!, key: join__FieldSet, extension: Boolean! = false, resolvable: Boolean! = true) repeatable on OBJECT | INTERFACE | UNION | ENUM | INPUT_OBJECT | SCALAR
1006
1007directive @link(url: String, as: String, for: link__Purpose, import: [link__Import]) repeatable on SCHEMA
1008
1009type Book implements Product
1010 @join__implements(graph: PRODUCTS, interface: "Product")
1011 @join__implements(graph: REVIEWS, interface: "Product")
1012 @join__type(graph: PRODUCTS, key: "id")
1013 @join__type(graph: REVIEWS, key: "id")
1014{
1015 id: ID!
1016 price: Price @join__field(graph: PRODUCTS)
1017 title: String @join__field(graph: PRODUCTS)
1018 vendor: User @join__field(graph: PRODUCTS)
1019 pages: Int @join__field(graph: PRODUCTS)
1020 avg_rating: Int @join__field(graph: PRODUCTS, requires: "reviews { rating }")
1021 reviews: [Review] @join__field(graph: PRODUCTS, external: true) @join__field(graph: REVIEWS)
1022}
1023
1024enum Currency
1025 @join__type(graph: PRODUCTS)
1026{
1027 USD
1028 EUR
1029}
1030
1031scalar join__FieldSet
1032
1033enum join__Graph {
1034 ACCOUNTS @join__graph(name: "accounts", url: "")
1035 PRODUCTS @join__graph(name: "products", url: "")
1036 REVIEWS @join__graph(name: "reviews", url: "")
1037}
1038
1039scalar link__Import
1040
1041enum link__Purpose {
1042 """
1043 `SECURITY` features provide metadata necessary to securely resolve fields.
1044 """
1045 SECURITY
1046
1047 """
1048 `EXECUTION` features provide metadata necessary for operation execution.
1049 """
1050 EXECUTION
1051}
1052
1053type Movie implements Product
1054 @join__implements(graph: PRODUCTS, interface: "Product")
1055 @join__implements(graph: REVIEWS, interface: "Product")
1056 @join__type(graph: PRODUCTS, key: "id")
1057 @join__type(graph: REVIEWS, key: "id")
1058{
1059 id: ID!
1060 price: Price @join__field(graph: PRODUCTS)
1061 title: String @join__field(graph: PRODUCTS)
1062 vendor: User @join__field(graph: PRODUCTS)
1063 length_minutes: Int @join__field(graph: PRODUCTS)
1064 avg_rating: Int @join__field(graph: PRODUCTS, requires: "reviews { rating }")
1065 reviews: [Review] @join__field(graph: PRODUCTS, external: true) @join__field(graph: REVIEWS)
1066}
1067
1068type Price
1069 @join__type(graph: PRODUCTS)
1070{
1071 value: Int
1072 currency: Currency
1073}
1074
1075interface Product
1076 @join__type(graph: PRODUCTS)
1077 @join__type(graph: REVIEWS)
1078{
1079 id: ID!
1080 price: Price @join__field(graph: PRODUCTS)
1081 vendor: User @join__field(graph: PRODUCTS)
1082 avg_rating: Int @join__field(graph: PRODUCTS)
1083 reviews: [Review] @join__field(graph: REVIEWS)
1084}
1085
1086type Query
1087 @join__type(graph: ACCOUNTS)
1088 @join__type(graph: PRODUCTS)
1089 @join__type(graph: REVIEWS)
1090{
1091 userById(id: ID!): User @join__field(graph: ACCOUNTS)
1092 me: User! @join__field(graph: ACCOUNTS) @join__field(graph: REVIEWS)
1093 productById(id: ID!): Product @join__field(graph: PRODUCTS)
1094 search(filter: SearchFilter): [Product] @join__field(graph: PRODUCTS)
1095 bestRatedProducts(limit: Int): [Product] @join__field(graph: REVIEWS)
1096}
1097
1098type Review
1099 @join__type(graph: PRODUCTS)
1100 @join__type(graph: REVIEWS)
1101{
1102 rating: Int @join__field(graph: PRODUCTS, external: true) @join__field(graph: REVIEWS)
1103 product: Product @join__field(graph: REVIEWS)
1104 author: User @join__field(graph: REVIEWS)
1105 text: String @join__field(graph: REVIEWS)
1106}
1107
1108input SearchFilter
1109 @join__type(graph: PRODUCTS)
1110{
1111 pattern: String!
1112 vendorName: String
1113}
1114
1115type User
1116 @join__type(graph: ACCOUNTS, key: "id")
1117 @join__type(graph: PRODUCTS, key: "id", resolvable: false)
1118 @join__type(graph: REVIEWS, key: "id")
1119{
1120 id: ID!
1121 name: String @join__field(graph: ACCOUNTS)
1122 email: String @join__field(graph: ACCOUNTS)
1123 password: String @join__field(graph: ACCOUNTS)
1124 nickname: String @join__field(graph: ACCOUNTS, override: "reviews")
1125 reviews: [Review] @join__field(graph: REVIEWS)
1126}
1127 "#;
1128
1129 #[test]
1130 fn plan_simple_query_for_single_subgraph() {
1131 let supergraph = Supergraph::new(TEST_SUPERGRAPH).unwrap();
1132 let planner = QueryPlanner::new(&supergraph, Default::default()).unwrap();
1133
1134 let document = ExecutableDocument::parse_and_validate(
1135 planner.api_schema().schema(),
1136 r#"
1137 {
1138 userById(id: 1) {
1139 name
1140 email
1141 }
1142 }
1143 "#,
1144 "operation.graphql",
1145 )
1146 .unwrap();
1147 let plan = planner
1148 .build_query_plan(&document, None, Default::default())
1149 .unwrap();
1150 insta::assert_snapshot!(plan, @r###"
1151 QueryPlan {
1152 Fetch(service: "accounts") {
1153 {
1154 userById(id: 1) {
1155 name
1156 email
1157 }
1158 }
1159 },
1160 }
1161 "###);
1162 }
1163
1164 #[test]
1165 fn plan_simple_query_for_multiple_subgraphs() {
1166 let supergraph = Supergraph::new(TEST_SUPERGRAPH).unwrap();
1167 let planner = QueryPlanner::new(&supergraph, Default::default()).unwrap();
1168
1169 let document = ExecutableDocument::parse_and_validate(
1170 planner.api_schema().schema(),
1171 r#"
1172 {
1173 bestRatedProducts {
1174 vendor { name }
1175 }
1176 }
1177 "#,
1178 "operation.graphql",
1179 )
1180 .unwrap();
1181 let plan = planner
1182 .build_query_plan(&document, None, Default::default())
1183 .unwrap();
1184 insta::assert_snapshot!(plan, @r###"
1185 QueryPlan {
1186 Sequence {
1187 Fetch(service: "reviews") {
1188 {
1189 bestRatedProducts {
1190 __typename
1191 ... on Book {
1192 __typename
1193 id
1194 }
1195 ... on Movie {
1196 __typename
1197 id
1198 }
1199 }
1200 }
1201 },
1202 Flatten(path: "bestRatedProducts.@") {
1203 Fetch(service: "products") {
1204 {
1205 ... on Book {
1206 __typename
1207 id
1208 }
1209 ... on Movie {
1210 __typename
1211 id
1212 }
1213 } =>
1214 {
1215 ... on Book {
1216 vendor {
1217 __typename
1218 id
1219 }
1220 }
1221 ... on Movie {
1222 vendor {
1223 __typename
1224 id
1225 }
1226 }
1227 }
1228 },
1229 },
1230 Flatten(path: "bestRatedProducts.@.vendor") {
1231 Fetch(service: "accounts") {
1232 {
1233 ... on User {
1234 __typename
1235 id
1236 }
1237 } =>
1238 {
1239 ... on User {
1240 name
1241 }
1242 }
1243 },
1244 },
1245 },
1246 }
1247 "###);
1248 }
1249
1250 #[test]
1251 fn plan_simple_root_field_query_for_multiple_subgraphs() {
1252 let supergraph = Supergraph::new(TEST_SUPERGRAPH).unwrap();
1253 let planner = QueryPlanner::new(&supergraph, Default::default()).unwrap();
1254
1255 let document = ExecutableDocument::parse_and_validate(
1256 planner.api_schema().schema(),
1257 r#"
1258 {
1259 userById(id: 1) {
1260 name
1261 email
1262 }
1263 bestRatedProducts {
1264 id
1265 avg_rating
1266 }
1267 }
1268 "#,
1269 "operation.graphql",
1270 )
1271 .unwrap();
1272 let plan = planner
1273 .build_query_plan(&document, None, Default::default())
1274 .unwrap();
1275 insta::assert_snapshot!(plan, @r###"
1276 QueryPlan {
1277 Parallel {
1278 Fetch(service: "accounts") {
1279 {
1280 userById(id: 1) {
1281 name
1282 email
1283 }
1284 }
1285 },
1286 Sequence {
1287 Fetch(service: "reviews") {
1288 {
1289 bestRatedProducts {
1290 __typename
1291 id
1292 ... on Book {
1293 __typename
1294 id
1295 reviews {
1296 rating
1297 }
1298 }
1299 ... on Movie {
1300 __typename
1301 id
1302 reviews {
1303 rating
1304 }
1305 }
1306 }
1307 }
1308 },
1309 Flatten(path: "bestRatedProducts.@") {
1310 Fetch(service: "products") {
1311 {
1312 ... on Book {
1313 __typename
1314 id
1315 reviews {
1316 rating
1317 }
1318 }
1319 ... on Movie {
1320 __typename
1321 id
1322 reviews {
1323 rating
1324 }
1325 }
1326 } =>
1327 {
1328 ... on Book {
1329 avg_rating
1330 }
1331 ... on Movie {
1332 avg_rating
1333 }
1334 }
1335 },
1336 },
1337 },
1338 },
1339 }
1340 "###);
1341 }
1342
1343 #[test]
1344 fn test_optimize_no_fragments_generated() {
1345 let supergraph = Supergraph::new(TEST_SUPERGRAPH).unwrap();
1346 let api_schema = supergraph.to_api_schema(Default::default()).unwrap();
1347 let document = ExecutableDocument::parse_and_validate(
1348 api_schema.schema(),
1349 r#"
1350 {
1351 userById(id: 1) {
1352 id
1353 ...userFields
1354 },
1355 another_user: userById(id: 2) {
1356 name
1357 email
1358 }
1359 }
1360 fragment userFields on User {
1361 name
1362 email
1363 }
1364 "#,
1365 "operation.graphql",
1366 )
1367 .unwrap();
1368
1369 let config = QueryPlannerConfig {
1370 generate_query_fragments: true,
1371 ..Default::default()
1372 };
1373 let planner = QueryPlanner::new(&supergraph, config).unwrap();
1374 let plan = planner
1375 .build_query_plan(&document, None, Default::default())
1376 .unwrap();
1377 insta::assert_snapshot!(plan, @r###"
1378 QueryPlan {
1379 Fetch(service: "accounts") {
1380 {
1381 userById(id: 1) {
1382 id
1383 name
1384 email
1385 }
1386 another_user: userById(id: 2) {
1387 name
1388 email
1389 }
1390 }
1391 },
1392 }
1393 "###);
1394 }
1395
1396 #[test]
1397 fn drop_operation_root_level_typename() {
1398 let supergraph = Supergraph::new(TEST_SUPERGRAPH).unwrap();
1399 let planner = QueryPlanner::new(&supergraph, Default::default()).unwrap();
1400
1401 let document = ExecutableDocument::parse_and_validate(
1402 planner.api_schema().schema(),
1403 r#"
1404 {
1405 __typename
1406 bestRatedProducts {
1407 id
1408 }
1409 }
1410 "#,
1411 "operation.graphql",
1412 )
1413 .unwrap();
1414 let plan = planner
1415 .build_query_plan(&document, None, Default::default())
1416 .unwrap();
1417 insta::assert_snapshot!(plan, @r###"
1419 QueryPlan {
1420 Fetch(service: "reviews") {
1421 {
1422 bestRatedProducts {
1423 __typename
1424 id
1425 }
1426 }
1427 },
1428 }
1429 "###);
1430 }
1431
1432 #[test]
1433 fn test_query_plan_statistics_nan_cost() {
1434 let stats = QueryPlanningStatistics {
1435 evaluated_plan_count: Cell::new(10),
1436 evaluated_plan_paths: Cell::new(20),
1437 best_plan_cost: f64::NAN,
1438 };
1439 let serialized = serde_json::to_string_pretty(&stats).expect("Serializing");
1440 insta::assert_snapshot!(serialized, @r###"
1441 {
1442 "evaluated_plan_count": 10,
1443 "evaluated_plan_paths": 20,
1444 "best_plan_cost": null
1445 }
1446 "###);
1447
1448 let deserialized: QueryPlanningStatistics =
1449 serde_json::from_str(&serialized).expect("Deserializing");
1450 assert!(deserialized.best_plan_cost.is_nan());
1451 }
1452}