1use std::cell::Cell;
2use std::num::NonZeroU32;
3use std::ops::ControlFlow;
4use std::sync::Arc;
5
6use apollo_compiler::ExecutableDocument;
7use apollo_compiler::Name;
8use apollo_compiler::collections::IndexMap;
9use apollo_compiler::collections::IndexSet;
10use apollo_compiler::validation::Valid;
11use itertools::Itertools;
12use petgraph::visit::EdgeRef;
13use serde::Deserialize;
14use serde::Serialize;
15use tracing::trace;
16
17use super::ConditionNode;
18use super::QueryPlanCost;
19use super::fetch_dependency_graph::FetchIdGenerator;
20use crate::ApiSchemaOptions;
21use crate::Supergraph;
22use crate::bail;
23use crate::error::FederationError;
24use crate::error::SingleFederationError;
25use crate::internal_error;
26use crate::operation::NormalizedDefer;
27use crate::operation::Operation;
28use crate::operation::SelectionSet;
29use crate::operation::normalize_operation;
30use crate::query_graph::OverrideConditions;
31use crate::query_graph::QueryGraph;
32use crate::query_graph::QueryGraphNodeType;
33use crate::query_graph::build_federated_query_graph;
34use crate::query_graph::path_tree::OpPathTree;
35use crate::query_plan::PlanNode;
36use crate::query_plan::QueryPlan;
37use crate::query_plan::SequenceNode;
38use crate::query_plan::TopLevelPlanNode;
39use crate::query_plan::fetch_dependency_graph::FetchDependencyGraph;
40use crate::query_plan::fetch_dependency_graph::FetchDependencyGraphNodePath;
41use crate::query_plan::fetch_dependency_graph::compute_nodes_for_tree;
42use crate::query_plan::fetch_dependency_graph_processor::FetchDependencyGraphProcessor;
43use crate::query_plan::fetch_dependency_graph_processor::FetchDependencyGraphToCostProcessor;
44use crate::query_plan::fetch_dependency_graph_processor::FetchDependencyGraphToQueryPlanProcessor;
45use crate::query_plan::query_planning_traversal::BestQueryPlanInfo;
46use crate::query_plan::query_planning_traversal::QueryPlanningParameters;
47use crate::query_plan::query_planning_traversal::QueryPlanningTraversal;
48use crate::query_plan::query_planning_traversal::convert_type_from_subgraph;
49use crate::query_plan::query_planning_traversal::non_local_selections_estimation;
50use crate::schema::ValidFederationSchema;
51use crate::schema::position::AbstractTypeDefinitionPosition;
52use crate::schema::position::CompositeTypeDefinitionPosition;
53use crate::schema::position::InterfaceTypeDefinitionPosition;
54use crate::schema::position::ObjectTypeDefinitionPosition;
55use crate::schema::position::OutputTypeDefinitionPosition;
56use crate::schema::position::SchemaRootDefinitionKind;
57use crate::schema::position::TypeDefinitionPosition;
58use crate::utils::logging::snapshot;
59
60#[derive(Debug, Clone, Hash, Serialize)]
61pub struct QueryPlannerConfig {
62 pub generate_query_fragments: bool,
67
68 pub subgraph_graphql_validation: bool,
76
77 pub incremental_delivery: QueryPlanIncrementalDeliveryConfig,
83
84 pub debug: QueryPlannerDebugConfig,
88
89 pub type_conditioned_fetching: bool,
96}
97
98#[allow(clippy::derivable_impls)] impl Default for QueryPlannerConfig {
100 fn default() -> Self {
101 Self {
102 generate_query_fragments: false,
103 subgraph_graphql_validation: false,
104 incremental_delivery: Default::default(),
105 debug: Default::default(),
106 type_conditioned_fetching: false,
107 }
108 }
109}
110
111#[derive(Debug, Clone, Default, Hash, Serialize)]
112pub struct QueryPlanIncrementalDeliveryConfig {
113 #[serde(default)]
123 pub enable_defer: bool,
124}
125
126#[derive(Debug, Clone, Hash, Serialize)]
127pub struct QueryPlannerDebugConfig {
128 pub max_evaluated_plans: NonZeroU32,
146
147 pub paths_limit: Option<u32>,
160}
161
162impl Default for QueryPlannerDebugConfig {
163 fn default() -> Self {
164 Self {
165 max_evaluated_plans: NonZeroU32::new(10_000).unwrap(),
166 paths_limit: None,
167 }
168 }
169}
170
171#[derive(Debug, PartialEq, Default, Serialize, Deserialize)]
173pub struct QueryPlanningStatistics {
174 pub evaluated_plan_count: Cell<usize>,
175 pub evaluated_plan_paths: Cell<usize>,
176 pub best_plan_cost: f64,
178}
179
180#[derive(Clone)]
181pub struct QueryPlanOptions<'a> {
182 pub override_conditions: Vec<String>,
189 pub check_for_cooperative_cancellation: Option<&'a dyn Fn() -> ControlFlow<()>>,
198 pub non_local_selections_limit_enabled: bool,
201 pub disabled_subgraph_names: IndexSet<String>,
206}
207
208impl Default for QueryPlanOptions<'_> {
209 fn default() -> Self {
210 Self {
211 override_conditions: Vec::new(),
212 check_for_cooperative_cancellation: None,
213 non_local_selections_limit_enabled: true,
214 disabled_subgraph_names: Default::default(),
215 }
216 }
217}
218
219impl std::fmt::Debug for QueryPlanOptions<'_> {
220 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
221 f.debug_struct("QueryPlanOptions")
222 .field("override_conditions", &self.override_conditions)
223 .field(
224 "check_for_cooperative_cancellation",
225 if self.check_for_cooperative_cancellation.is_some() {
226 &"Some(...)"
227 } else {
228 &"None"
229 },
230 )
231 .field(
232 "non_local_selections_limit_enabled",
233 &self.non_local_selections_limit_enabled,
234 )
235 .finish()
236 }
237}
238
239pub struct QueryPlanner {
240 config: QueryPlannerConfig,
241 federated_query_graph: Arc<QueryGraph>,
242 supergraph_schema: ValidFederationSchema,
243 api_schema: ValidFederationSchema,
244 interface_types_with_interface_objects: IndexSet<InterfaceTypeDefinitionPosition>,
247 abstract_types_with_inconsistent_runtime_types: IndexSet<Name>,
252}
253
254impl QueryPlanner {
255 #[cfg_attr(
256 feature = "snapshot_tracing",
257 tracing::instrument(level = "trace", skip_all, name = "QueryPlanner::new")
258 )]
259 pub fn new(
260 supergraph: &Supergraph,
261 config: QueryPlannerConfig,
262 ) -> Result<Self, FederationError> {
263 let supergraph_schema = supergraph.schema.clone();
264 let api_schema = supergraph.to_api_schema(ApiSchemaOptions {
265 include_defer: config.incremental_delivery.enable_defer,
266 ..Default::default()
267 })?;
268 let query_graph = build_federated_query_graph(
269 supergraph_schema.clone(),
270 api_schema.clone(),
271 Some(true),
272 Some(true),
273 )?;
274
275 let interface_types_with_interface_objects = supergraph
276 .schema
277 .get_types()
278 .filter_map(|position| match position {
279 TypeDefinitionPosition::Interface(interface_position) => Some(interface_position),
280 _ => None,
281 })
282 .map(|position| {
283 let is_interface_object = query_graph
284 .subgraphs()
285 .map(|(_name, schema)| {
286 let Some(position) = schema.try_get_type(position.type_name.clone()) else {
287 return Ok(false);
288 };
289 schema.is_interface_object_type(position)
290 })
291 .process_results(|mut iter| iter.any(|b| b))?;
292 Ok::<_, FederationError>((position, is_interface_object))
293 })
294 .process_results(|iter| {
295 iter.flat_map(|(position, is_interface_object)| {
296 if is_interface_object {
297 Some(position)
298 } else {
299 None
300 }
301 })
302 .collect::<IndexSet<_>>()
303 })?;
304
305 let is_inconsistent = |position: AbstractTypeDefinitionPosition| {
306 let mut sources = query_graph.subgraphs().filter_map(|(_name, subgraph)| {
307 match subgraph.try_get_type(position.type_name().clone())? {
308 TypeDefinitionPosition::Object(_) => None,
313 TypeDefinitionPosition::Interface(interface) => Some(
314 subgraph
315 .referencers()
316 .get_interface_type(&interface.type_name)
317 .ok()?
318 .object_types
319 .clone(),
320 ),
321 TypeDefinitionPosition::Union(union_) => Some(
322 union_
323 .try_get(subgraph.schema())?
324 .members
325 .iter()
326 .map(|member| ObjectTypeDefinitionPosition::new(member.name.clone()))
327 .collect(),
328 ),
329 _ => None,
330 }
331 });
332
333 let Some(expected_runtimes) = sources.next() else {
334 return false;
335 };
336 !sources.all(|runtimes| runtimes == expected_runtimes)
337 };
338
339 let abstract_types_with_inconsistent_runtime_types = supergraph
340 .schema
341 .get_types()
342 .filter_map(|position| AbstractTypeDefinitionPosition::try_from(position).ok())
343 .filter(|position| is_inconsistent(position.clone()))
344 .map(|position| position.type_name().clone())
345 .collect::<IndexSet<_>>();
346
347 Ok(Self {
348 config,
349 federated_query_graph: Arc::new(query_graph),
350 supergraph_schema,
351 api_schema,
352 interface_types_with_interface_objects,
353 abstract_types_with_inconsistent_runtime_types,
354 })
355 }
356
357 pub fn subgraph_schemas(&self) -> &IndexMap<Arc<str>, ValidFederationSchema> {
358 self.federated_query_graph.subgraph_schemas()
359 }
360
361 #[cfg_attr(
363 feature = "snapshot_tracing",
364 tracing::instrument(level = "trace", skip_all, name = "QueryPlanner::build_query_plan")
365 )]
366 pub fn build_query_plan(
367 &self,
368 document: &Valid<ExecutableDocument>,
369 operation_name: Option<Name>,
370 options: QueryPlanOptions,
371 ) -> Result<QueryPlan, FederationError> {
372 let operation = document
373 .operations
374 .get(operation_name.as_ref().map(|name| name.as_str()))
375 .map_err(|_| {
376 if operation_name.is_some() {
377 SingleFederationError::UnknownOperation
378 } else {
379 SingleFederationError::OperationNameNotProvided
380 }
381 })?;
382 if operation.selection_set.is_empty() {
383 crate::bail!("Invalid operation: empty selection set")
385 }
386
387 let is_subscription = operation.is_subscription();
388
389 let statistics = QueryPlanningStatistics::default();
390
391 let normalized_operation = normalize_operation(
392 operation,
393 &document.fragments,
394 &self.api_schema,
395 &self.interface_types_with_interface_objects,
396 &|| {
397 QueryPlanningParameters::check_cancellation_with(
398 &options.check_for_cooperative_cancellation,
399 )
400 },
401 )?;
402
403 let NormalizedDefer {
404 operation: normalized_operation,
405 assigned_defer_labels,
406 defer_conditions,
407 has_defers,
408 } = normalized_operation.with_normalized_defer()?;
409 if has_defers && is_subscription {
410 return Err(SingleFederationError::DeferredSubscriptionUnsupported.into());
411 }
412
413 if normalized_operation.selection_set.is_empty() {
414 return Ok(QueryPlan::default());
415 }
416
417 snapshot!(
418 "NormalizedOperation",
419 serde_json_bytes::json!({
420 "original": &operation.serialize().to_string(),
421 "normalized": &normalized_operation.to_string()
422 })
423 .to_string(),
424 "normalized operation"
425 );
426
427 let Some(root) = self
428 .federated_query_graph
429 .root_kinds_to_nodes()?
430 .get(&normalized_operation.root_kind)
431 else {
432 bail!(
433 "Shouldn't have a {0} operation if the subgraphs don't have a {0} root",
434 normalized_operation.root_kind
435 )
436 };
437
438 let operation_compression = if self.config.generate_query_fragments {
439 SubgraphOperationCompression::GenerateFragments
440 } else {
441 SubgraphOperationCompression::Disabled
442 };
443 let mut processor = FetchDependencyGraphToQueryPlanProcessor::new(
444 normalized_operation.variables.clone(),
445 normalized_operation.directives.clone(),
446 operation_compression,
447 operation.name.clone(),
448 assigned_defer_labels,
449 );
450 let mut parameters = QueryPlanningParameters {
451 supergraph_schema: self.supergraph_schema.clone(),
452 federated_query_graph: self.federated_query_graph.clone(),
453 operation: Arc::new(normalized_operation),
454 head: *root,
455 head_must_be_root: true,
458 statistics: &statistics,
459 abstract_types_with_inconsistent_runtime_types: self
460 .abstract_types_with_inconsistent_runtime_types
461 .clone()
462 .into(),
463 config: self.config.clone(),
464 override_conditions: OverrideConditions::new(
465 &self.federated_query_graph,
466 &IndexSet::from_iter(options.override_conditions),
467 ),
468 check_for_cooperative_cancellation: options.check_for_cooperative_cancellation,
469 fetch_id_generator: Arc::new(FetchIdGenerator::new()),
470 disabled_subgraphs: self
471 .federated_query_graph
472 .subgraphs()
473 .filter_map(|(subgraph, _)| {
474 if options.disabled_subgraph_names.contains(subgraph.as_ref()) {
475 Some(subgraph.clone())
476 } else {
477 None
478 }
479 })
480 .collect(),
481 };
482
483 let mut non_local_selection_state = options
484 .non_local_selections_limit_enabled
485 .then(non_local_selections_estimation::State::default);
486 let (root_node, cost) = if !defer_conditions.is_empty() {
487 compute_plan_for_defer_conditionals(
488 &mut parameters,
489 &mut processor,
490 defer_conditions,
491 &mut non_local_selection_state,
492 )
493 } else {
494 compute_plan_internal(
495 &mut parameters,
496 &mut processor,
497 has_defers,
498 &mut non_local_selection_state,
499 )
500 }?;
501
502 let root_node = match root_node {
503 Some(PlanNode::Fetch(root_node)) if is_subscription => Some(
507 TopLevelPlanNode::Subscription(crate::query_plan::SubscriptionNode {
508 primary: root_node,
509 rest: None,
510 }),
511 ),
512 Some(PlanNode::Sequence(root_node)) if is_subscription => {
513 let Some((primary, rest)) = root_node.nodes.split_first() else {
514 bail!("Invalid query plan: Sequence must have at least one node");
516 };
517 let PlanNode::Fetch(primary) = primary.clone() else {
518 bail!("Invalid query plan: Primary node of a subscription is not a Fetch");
519 };
520 let rest = PlanNode::Sequence(SequenceNode {
521 nodes: rest.to_vec(),
522 });
523 Some(TopLevelPlanNode::Subscription(
524 crate::query_plan::SubscriptionNode {
525 primary,
526 rest: Some(Box::new(rest)),
527 },
528 ))
529 }
530 Some(node) if is_subscription => {
531 bail!(
532 "Invalid query plan for subscription: unexpected {} at root",
533 node.node_kind()
534 );
535 }
536 Some(PlanNode::Fetch(inner)) => Some(TopLevelPlanNode::Fetch(inner)),
537 Some(PlanNode::Sequence(inner)) => Some(TopLevelPlanNode::Sequence(inner)),
538 Some(PlanNode::Parallel(inner)) => Some(TopLevelPlanNode::Parallel(inner)),
539 Some(PlanNode::Flatten(inner)) => Some(TopLevelPlanNode::Flatten(inner)),
540 Some(PlanNode::Defer(inner)) => Some(TopLevelPlanNode::Defer(inner)),
541 Some(PlanNode::Condition(inner)) => Some(TopLevelPlanNode::Condition(inner)),
542 None => None,
543 };
544
545 let plan = QueryPlan {
546 node: root_node,
547 statistics: QueryPlanningStatistics {
548 best_plan_cost: cost,
549 ..statistics
550 },
551 };
552
553 snapshot!(
554 "QueryPlan",
555 plan.to_string(),
556 "QueryPlan from build_query_plan"
557 );
558 snapshot!(
559 plan.statistics,
560 "QueryPlanningStatistics from build_query_plan"
561 );
562
563 Ok(plan)
564 }
565
566 pub fn api_schema(&self) -> &ValidFederationSchema {
568 &self.api_schema
569 }
570
571 pub fn supergraph_schema(&self) -> &ValidFederationSchema {
572 &self.supergraph_schema
573 }
574
575 pub fn override_condition_labels(&self) -> &IndexSet<Arc<str>> {
576 self.federated_query_graph.override_condition_labels()
577 }
578}
579
580fn compute_root_serial_dependency_graph_for_mutation(
581 parameters: &QueryPlanningParameters,
582 has_defers: bool,
583 non_local_selection_state: &mut Option<non_local_selections_estimation::State>,
584) -> Result<Vec<FetchDependencyGraph>, FederationError> {
585 let QueryPlanningParameters {
586 supergraph_schema,
587 federated_query_graph,
588 operation,
589 ..
590 } = parameters;
591 let root_type: Option<CompositeTypeDefinitionPosition> = if has_defers {
592 supergraph_schema
593 .schema()
594 .root_operation(operation.root_kind.into())
595 .and_then(|name| supergraph_schema.get_type(name.clone()).ok())
596 .and_then(|ty| ty.try_into().ok())
597 } else {
598 None
599 };
600 let mut split_roots = operation.selection_set.clone().split_top_level_fields();
602 let mut digest = Vec::new();
603 let selection_set = split_roots
604 .next()
605 .ok_or_else(|| FederationError::internal("Empty top level fields"))?;
606 let BestQueryPlanInfo {
607 mut fetch_dependency_graph,
608 path_tree: mut prev_path,
609 ..
610 } = compute_root_parallel_best_plan_for_mutation(
611 parameters,
612 selection_set,
613 has_defers,
614 non_local_selection_state,
615 )?;
616 let mut prev_subgraph = only_root_subgraph(&fetch_dependency_graph)?;
617 for selection_set in split_roots {
618 let BestQueryPlanInfo {
619 fetch_dependency_graph: new_dep_graph,
620 path_tree: new_path,
621 ..
622 } = compute_root_parallel_best_plan_for_mutation(
623 parameters,
624 selection_set,
625 has_defers,
626 non_local_selection_state,
627 )?;
628 let new_subgraph = only_root_subgraph(&new_dep_graph)?;
629 if new_subgraph == prev_subgraph {
630 Arc::make_mut(&mut prev_path).extend(&new_path);
640 fetch_dependency_graph = FetchDependencyGraph::new(
641 supergraph_schema.clone(),
642 federated_query_graph.clone(),
643 root_type.clone(),
644 fetch_dependency_graph.fetch_id_generation.clone(),
645 );
646 compute_root_fetch_groups(
647 operation.root_kind,
648 federated_query_graph,
649 &mut fetch_dependency_graph,
650 &prev_path,
651 parameters.config.type_conditioned_fetching,
652 &|| parameters.check_cancellation(),
653 )?;
654 } else {
655 digest.push(std::mem::replace(
660 &mut fetch_dependency_graph,
661 new_dep_graph,
662 ));
663 prev_path = new_path;
664 prev_subgraph = new_subgraph;
665 }
666 }
667 digest.push(fetch_dependency_graph);
668 Ok(digest)
669}
670
671fn only_root_subgraph(graph: &FetchDependencyGraph) -> Result<Arc<str>, FederationError> {
672 let mut iter = graph.root_node_by_subgraph_iter();
673 let (Some((name, _)), None) = (iter.next(), iter.next()) else {
674 return Err(FederationError::internal(format!(
675 "{graph} should have only one root."
676 )));
677 };
678 Ok(name.clone())
679}
680
681#[cfg_attr(
682 feature = "snapshot_tracing",
683 tracing::instrument(level = "trace", skip_all, name = "compute_root_fetch_groups")
684)]
685pub(crate) fn compute_root_fetch_groups(
686 root_kind: SchemaRootDefinitionKind,
687 federated_query_graph: &QueryGraph,
688 dependency_graph: &mut FetchDependencyGraph,
689 path: &OpPathTree,
690 type_conditioned_fetching_enabled: bool,
691 check_cancellation: &dyn Fn() -> Result<(), SingleFederationError>,
692) -> Result<(), FederationError> {
693 for child in &path.childs {
700 let edge = child.edge.expect("The root edge should not be None");
701 let (_source_node, target_node) = path.graph.edge_endpoints(edge)?;
702 let target_node = path.graph.node_weight(target_node)?;
703 let subgraph_name = &target_node.source;
704 let root_type: CompositeTypeDefinitionPosition = match &target_node.type_ {
705 QueryGraphNodeType::SchemaType(OutputTypeDefinitionPosition::Object(object)) => {
706 object.clone().into()
707 }
708 ty => {
709 return Err(FederationError::internal(format!(
710 "expected an object type for the root of a subgraph, found {ty}"
711 )));
712 }
713 };
714 let fetch_dependency_node = dependency_graph.get_or_create_root_node(
715 subgraph_name,
716 root_kind,
717 root_type.clone(),
718 )?;
719 snapshot!(
720 "FetchDependencyGraph",
721 dependency_graph.to_dot(),
722 "tree_with_root_node"
723 );
724 let subgraph_schema = federated_query_graph.schema_by_source(subgraph_name)?;
725 let supergraph_root_type = convert_type_from_subgraph(
726 root_type,
727 subgraph_schema,
728 &dependency_graph.supergraph_schema,
729 )?;
730 compute_nodes_for_tree(
731 dependency_graph,
732 &child.tree,
733 fetch_dependency_node,
734 FetchDependencyGraphNodePath::new(
735 dependency_graph.supergraph_schema.clone(),
736 type_conditioned_fetching_enabled,
737 supergraph_root_type,
738 )?,
739 Default::default(),
740 &Default::default(),
741 check_cancellation,
742 )?;
743 }
744 Ok(())
745}
746
747fn compute_root_parallel_dependency_graph(
748 parameters: &QueryPlanningParameters,
749 has_defers: bool,
750 non_local_selection_state: &mut Option<non_local_selections_estimation::State>,
751) -> Result<(FetchDependencyGraph, QueryPlanCost), FederationError> {
752 trace!("Starting process to construct a parallel fetch dependency graph");
753 let selection_set = parameters.operation.selection_set.clone();
754 let best_plan = compute_root_parallel_best_plan(
755 parameters,
756 selection_set,
757 has_defers,
758 non_local_selection_state,
759 )?;
760 snapshot!(
761 "FetchDependencyGraph",
762 best_plan.fetch_dependency_graph.to_dot(),
763 "Fetch dependency graph returned from compute_root_parallel_best_plan"
764 );
765 Ok((best_plan.fetch_dependency_graph, best_plan.cost))
766}
767
768fn compute_root_parallel_best_plan(
769 parameters: &QueryPlanningParameters,
770 selection: SelectionSet,
771 has_defers: bool,
772 non_local_selection_state: &mut Option<non_local_selections_estimation::State>,
773) -> Result<BestQueryPlanInfo, FederationError> {
774 let planning_traversal = QueryPlanningTraversal::new(
775 parameters,
776 selection,
777 has_defers,
778 parameters.operation.root_kind,
779 FetchDependencyGraphToCostProcessor,
780 non_local_selection_state.as_mut(),
781 None,
782 )?;
783
784 Ok(planning_traversal
787 .find_best_plan()?
788 .unwrap_or_else(|| BestQueryPlanInfo::empty(parameters)))
789}
790
791fn compute_root_parallel_best_plan_for_mutation(
792 parameters: &QueryPlanningParameters,
793 selection: SelectionSet,
794 has_defers: bool,
795 non_local_selection_state: &mut Option<non_local_selections_estimation::State>,
796) -> Result<BestQueryPlanInfo, FederationError> {
797 parameters.federated_query_graph.out_edges(parameters.head).into_iter().map(|edge_ref| {
798 let mutation_subgraph = parameters.federated_query_graph.node_weight(edge_ref.target())?.source.clone();
799 let planning_traversal = QueryPlanningTraversal::new(
800 parameters,
801 selection.clone(),
802 has_defers,
803 parameters.operation.root_kind,
804 FetchDependencyGraphToCostProcessor,
805 non_local_selection_state.as_mut(),
806 Some(mutation_subgraph),
807 )?;
808 planning_traversal.find_best_plan()
809 }).process_results(|iter| iter
810 .flatten()
811 .min_by(|a, b| a.cost.total_cmp(&b.cost))
812 .map(Ok)
813 .unwrap_or_else(|| {
814 if parameters.disabled_subgraphs.is_empty() {
815 Err(FederationError::internal(format!(
816 "Was not able to plan {} starting from a single subgraph: This shouldn't have happened.",
817 parameters.operation,
818 )))
819 } else {
820 Err(SingleFederationError::NoPlanFoundWithDisabledSubgraphs.into())
823 }
824 })
825 )?
826}
827
828fn compute_plan_internal(
829 parameters: &mut QueryPlanningParameters,
830 processor: &mut FetchDependencyGraphToQueryPlanProcessor,
831 has_defers: bool,
832 non_local_selection_state: &mut Option<non_local_selections_estimation::State>,
833) -> Result<(Option<PlanNode>, QueryPlanCost), FederationError> {
834 let root_kind = parameters.operation.root_kind;
835
836 let (main, deferred, primary_selection, cost) = if root_kind
837 == SchemaRootDefinitionKind::Mutation
838 {
839 let dependency_graphs = compute_root_serial_dependency_graph_for_mutation(
840 parameters,
841 has_defers,
842 non_local_selection_state,
843 )?;
844 let mut main = None;
845 let mut deferred = vec![];
846 let mut primary_selection = None::<SelectionSet>;
847 for mut dependency_graph in dependency_graphs {
848 let (local_main, local_deferred) =
849 dependency_graph.process(&mut *processor, root_kind)?;
850 main = match main {
851 Some(unlocal_main) => processor.reduce_sequence([Some(unlocal_main), local_main]),
852 None => local_main,
853 };
854 deferred.extend(local_deferred);
855 let new_selection = dependency_graph.defer_tracking.primary_selection;
856 match primary_selection.as_mut() {
857 Some(selection) => {
858 if let Some(new_selection) = new_selection {
859 selection.add_local_selection_set(&new_selection)?
860 }
861 }
862 None => primary_selection = new_selection,
863 }
864 }
865 (main, deferred, primary_selection, f64::NAN)
867 } else {
868 let (mut dependency_graph, cost) = compute_root_parallel_dependency_graph(
869 parameters,
870 has_defers,
871 non_local_selection_state,
872 )?;
873
874 let (main, deferred) = dependency_graph.process(&mut *processor, root_kind)?;
875 snapshot!(
876 "FetchDependencyGraph",
877 dependency_graph.to_dot(),
878 "Plan after calling FetchDependencyGraph::process"
879 );
880 let primary_selection = dependency_graph.defer_tracking.primary_selection;
882
883 (main, deferred, primary_selection, cost)
884 };
885
886 if deferred.is_empty() {
887 Ok((main, cost))
888 } else {
889 let Some(primary_selection) = primary_selection else {
890 unreachable!("Should have had a primary selection created");
891 };
892 let reduced_main = processor.reduce_defer(main, &primary_selection, deferred)?;
893 Ok((reduced_main, cost))
894 }
895}
896
897fn compute_plan_for_defer_conditionals(
898 parameters: &mut QueryPlanningParameters,
899 processor: &mut FetchDependencyGraphToQueryPlanProcessor,
900 defer_conditions: IndexMap<Name, IndexSet<String>>,
901 non_local_selection_state: &mut Option<non_local_selections_estimation::State>,
902) -> Result<(Option<PlanNode>, QueryPlanCost), FederationError> {
903 generate_condition_nodes(
904 parameters.operation.clone(),
905 defer_conditions.iter(),
906 &mut |op| {
907 parameters.operation = op;
908 compute_plan_internal(parameters, processor, true, non_local_selection_state)
909 },
910 )
911}
912
913fn generate_condition_nodes<'a>(
914 op: Arc<Operation>,
915 mut conditions: impl Clone + Iterator<Item = (&'a Name, &'a IndexSet<String>)>,
916 on_final_operation: &mut impl FnMut(
917 Arc<Operation>,
918 ) -> Result<(Option<PlanNode>, f64), FederationError>,
919) -> Result<(Option<PlanNode>, f64), FederationError> {
920 match conditions.next() {
921 None => on_final_operation(op),
922 Some((cond, labels)) => {
923 let else_op = Arc::unwrap_or_clone(op.clone()).reduce_defer(labels)?;
924 let if_op = op;
925 let (if_node, if_cost) =
926 generate_condition_nodes(if_op, conditions.clone(), on_final_operation)?;
927 let (else_node, else_cost) = generate_condition_nodes(
928 Arc::new(else_op),
929 conditions.clone(),
930 on_final_operation,
931 )?;
932 let node = ConditionNode {
933 condition_variable: cond.clone(),
934 if_clause: if_node.map(Box::new),
935 else_clause: else_node.map(Box::new),
936 };
937 Ok((
938 Some(PlanNode::Condition(Box::new(node))),
939 if_cost.max(else_cost),
940 ))
941 }
942 }
943}
944
945pub(crate) enum SubgraphOperationCompression {
946 GenerateFragments,
947 Disabled,
948}
949
950impl SubgraphOperationCompression {
951 pub(crate) fn compress(
953 &mut self,
954 operation: Operation,
955 ) -> Result<Valid<ExecutableDocument>, FederationError> {
956 match self {
957 Self::GenerateFragments => Ok(operation.generate_fragments()?),
958 Self::Disabled => {
959 let operation_document = operation.try_into().map_err(|err: FederationError| {
960 if err.has_invalid_graphql_error() {
961 internal_error!(
962 "Query planning produced an invalid subgraph operation.\n{err}"
963 )
964 } else {
965 err
966 }
967 })?;
968 Ok(operation_document)
969 }
970 }
971 }
972}
973
974#[cfg(test)]
975mod tests {
976 use super::*;
977
978 const TEST_SUPERGRAPH: &str = r#"
979schema
980 @link(url: "https://specs.apollo.dev/link/v1.0")
981 @link(url: "https://specs.apollo.dev/join/v0.2", for: EXECUTION)
982{
983 query: Query
984}
985
986directive @join__field(graph: join__Graph!, requires: join__FieldSet, provides: join__FieldSet, type: String, external: Boolean, override: String, usedOverridden: Boolean) repeatable on FIELD_DEFINITION | INPUT_FIELD_DEFINITION
987
988directive @join__graph(name: String!, url: String!) on ENUM_VALUE
989
990directive @join__implements(graph: join__Graph!, interface: String!) repeatable on OBJECT | INTERFACE
991
992directive @join__type(graph: join__Graph!, key: join__FieldSet, extension: Boolean! = false, resolvable: Boolean! = true) repeatable on OBJECT | INTERFACE | UNION | ENUM | INPUT_OBJECT | SCALAR
993
994directive @link(url: String, as: String, for: link__Purpose, import: [link__Import]) repeatable on SCHEMA
995
996type Book implements Product
997 @join__implements(graph: PRODUCTS, interface: "Product")
998 @join__implements(graph: REVIEWS, interface: "Product")
999 @join__type(graph: PRODUCTS, key: "id")
1000 @join__type(graph: REVIEWS, key: "id")
1001{
1002 id: ID!
1003 price: Price @join__field(graph: PRODUCTS)
1004 title: String @join__field(graph: PRODUCTS)
1005 vendor: User @join__field(graph: PRODUCTS)
1006 pages: Int @join__field(graph: PRODUCTS)
1007 avg_rating: Int @join__field(graph: PRODUCTS, requires: "reviews { rating }")
1008 reviews: [Review] @join__field(graph: PRODUCTS, external: true) @join__field(graph: REVIEWS)
1009}
1010
1011enum Currency
1012 @join__type(graph: PRODUCTS)
1013{
1014 USD
1015 EUR
1016}
1017
1018scalar join__FieldSet
1019
1020enum join__Graph {
1021 ACCOUNTS @join__graph(name: "accounts", url: "")
1022 PRODUCTS @join__graph(name: "products", url: "")
1023 REVIEWS @join__graph(name: "reviews", url: "")
1024}
1025
1026scalar link__Import
1027
1028enum link__Purpose {
1029 """
1030 `SECURITY` features provide metadata necessary to securely resolve fields.
1031 """
1032 SECURITY
1033
1034 """
1035 `EXECUTION` features provide metadata necessary for operation execution.
1036 """
1037 EXECUTION
1038}
1039
1040type Movie implements Product
1041 @join__implements(graph: PRODUCTS, interface: "Product")
1042 @join__implements(graph: REVIEWS, interface: "Product")
1043 @join__type(graph: PRODUCTS, key: "id")
1044 @join__type(graph: REVIEWS, key: "id")
1045{
1046 id: ID!
1047 price: Price @join__field(graph: PRODUCTS)
1048 title: String @join__field(graph: PRODUCTS)
1049 vendor: User @join__field(graph: PRODUCTS)
1050 length_minutes: Int @join__field(graph: PRODUCTS)
1051 avg_rating: Int @join__field(graph: PRODUCTS, requires: "reviews { rating }")
1052 reviews: [Review] @join__field(graph: PRODUCTS, external: true) @join__field(graph: REVIEWS)
1053}
1054
1055type Price
1056 @join__type(graph: PRODUCTS)
1057{
1058 value: Int
1059 currency: Currency
1060}
1061
1062interface Product
1063 @join__type(graph: PRODUCTS)
1064 @join__type(graph: REVIEWS)
1065{
1066 id: ID!
1067 price: Price @join__field(graph: PRODUCTS)
1068 vendor: User @join__field(graph: PRODUCTS)
1069 avg_rating: Int @join__field(graph: PRODUCTS)
1070 reviews: [Review] @join__field(graph: REVIEWS)
1071}
1072
1073type Query
1074 @join__type(graph: ACCOUNTS)
1075 @join__type(graph: PRODUCTS)
1076 @join__type(graph: REVIEWS)
1077{
1078 userById(id: ID!): User @join__field(graph: ACCOUNTS)
1079 me: User! @join__field(graph: ACCOUNTS) @join__field(graph: REVIEWS)
1080 productById(id: ID!): Product @join__field(graph: PRODUCTS)
1081 search(filter: SearchFilter): [Product] @join__field(graph: PRODUCTS)
1082 bestRatedProducts(limit: Int): [Product] @join__field(graph: REVIEWS)
1083}
1084
1085type Review
1086 @join__type(graph: PRODUCTS)
1087 @join__type(graph: REVIEWS)
1088{
1089 rating: Int @join__field(graph: PRODUCTS, external: true) @join__field(graph: REVIEWS)
1090 product: Product @join__field(graph: REVIEWS)
1091 author: User @join__field(graph: REVIEWS)
1092 text: String @join__field(graph: REVIEWS)
1093}
1094
1095input SearchFilter
1096 @join__type(graph: PRODUCTS)
1097{
1098 pattern: String!
1099 vendorName: String
1100}
1101
1102type User
1103 @join__type(graph: ACCOUNTS, key: "id")
1104 @join__type(graph: PRODUCTS, key: "id", resolvable: false)
1105 @join__type(graph: REVIEWS, key: "id")
1106{
1107 id: ID!
1108 name: String @join__field(graph: ACCOUNTS)
1109 email: String @join__field(graph: ACCOUNTS)
1110 password: String @join__field(graph: ACCOUNTS)
1111 nickname: String @join__field(graph: ACCOUNTS, override: "reviews")
1112 reviews: [Review] @join__field(graph: REVIEWS)
1113}
1114 "#;
1115
1116 #[test]
1117 fn plan_simple_query_for_single_subgraph() {
1118 let supergraph = Supergraph::new(TEST_SUPERGRAPH).unwrap();
1119 let planner = QueryPlanner::new(&supergraph, Default::default()).unwrap();
1120
1121 let document = ExecutableDocument::parse_and_validate(
1122 planner.api_schema().schema(),
1123 r#"
1124 {
1125 userById(id: 1) {
1126 name
1127 email
1128 }
1129 }
1130 "#,
1131 "operation.graphql",
1132 )
1133 .unwrap();
1134 let plan = planner
1135 .build_query_plan(&document, None, Default::default())
1136 .unwrap();
1137 insta::assert_snapshot!(plan, @r###"
1138 QueryPlan {
1139 Fetch(service: "accounts") {
1140 {
1141 userById(id: 1) {
1142 name
1143 email
1144 }
1145 }
1146 },
1147 }
1148 "###);
1149 }
1150
1151 #[test]
1152 fn plan_simple_query_for_multiple_subgraphs() {
1153 let supergraph = Supergraph::new(TEST_SUPERGRAPH).unwrap();
1154 let planner = QueryPlanner::new(&supergraph, Default::default()).unwrap();
1155
1156 let document = ExecutableDocument::parse_and_validate(
1157 planner.api_schema().schema(),
1158 r#"
1159 {
1160 bestRatedProducts {
1161 vendor { name }
1162 }
1163 }
1164 "#,
1165 "operation.graphql",
1166 )
1167 .unwrap();
1168 let plan = planner
1169 .build_query_plan(&document, None, Default::default())
1170 .unwrap();
1171 insta::assert_snapshot!(plan, @r###"
1172 QueryPlan {
1173 Sequence {
1174 Fetch(service: "reviews") {
1175 {
1176 bestRatedProducts {
1177 __typename
1178 ... on Book {
1179 __typename
1180 id
1181 }
1182 ... on Movie {
1183 __typename
1184 id
1185 }
1186 }
1187 }
1188 },
1189 Flatten(path: "bestRatedProducts.@") {
1190 Fetch(service: "products") {
1191 {
1192 ... on Book {
1193 __typename
1194 id
1195 }
1196 ... on Movie {
1197 __typename
1198 id
1199 }
1200 } =>
1201 {
1202 ... on Book {
1203 vendor {
1204 __typename
1205 id
1206 }
1207 }
1208 ... on Movie {
1209 vendor {
1210 __typename
1211 id
1212 }
1213 }
1214 }
1215 },
1216 },
1217 Flatten(path: "bestRatedProducts.@.vendor") {
1218 Fetch(service: "accounts") {
1219 {
1220 ... on User {
1221 __typename
1222 id
1223 }
1224 } =>
1225 {
1226 ... on User {
1227 name
1228 }
1229 }
1230 },
1231 },
1232 },
1233 }
1234 "###);
1235 }
1236
1237 #[test]
1238 fn plan_simple_root_field_query_for_multiple_subgraphs() {
1239 let supergraph = Supergraph::new(TEST_SUPERGRAPH).unwrap();
1240 let planner = QueryPlanner::new(&supergraph, Default::default()).unwrap();
1241
1242 let document = ExecutableDocument::parse_and_validate(
1243 planner.api_schema().schema(),
1244 r#"
1245 {
1246 userById(id: 1) {
1247 name
1248 email
1249 }
1250 bestRatedProducts {
1251 id
1252 avg_rating
1253 }
1254 }
1255 "#,
1256 "operation.graphql",
1257 )
1258 .unwrap();
1259 let plan = planner
1260 .build_query_plan(&document, None, Default::default())
1261 .unwrap();
1262 insta::assert_snapshot!(plan, @r###"
1263 QueryPlan {
1264 Parallel {
1265 Fetch(service: "accounts") {
1266 {
1267 userById(id: 1) {
1268 name
1269 email
1270 }
1271 }
1272 },
1273 Sequence {
1274 Fetch(service: "reviews") {
1275 {
1276 bestRatedProducts {
1277 __typename
1278 id
1279 ... on Book {
1280 __typename
1281 id
1282 reviews {
1283 rating
1284 }
1285 }
1286 ... on Movie {
1287 __typename
1288 id
1289 reviews {
1290 rating
1291 }
1292 }
1293 }
1294 }
1295 },
1296 Flatten(path: "bestRatedProducts.@") {
1297 Fetch(service: "products") {
1298 {
1299 ... on Book {
1300 __typename
1301 id
1302 reviews {
1303 rating
1304 }
1305 }
1306 ... on Movie {
1307 __typename
1308 id
1309 reviews {
1310 rating
1311 }
1312 }
1313 } =>
1314 {
1315 ... on Book {
1316 avg_rating
1317 }
1318 ... on Movie {
1319 avg_rating
1320 }
1321 }
1322 },
1323 },
1324 },
1325 },
1326 }
1327 "###);
1328 }
1329
1330 #[test]
1331 fn test_optimize_no_fragments_generated() {
1332 let supergraph = Supergraph::new(TEST_SUPERGRAPH).unwrap();
1333 let api_schema = supergraph.to_api_schema(Default::default()).unwrap();
1334 let document = ExecutableDocument::parse_and_validate(
1335 api_schema.schema(),
1336 r#"
1337 {
1338 userById(id: 1) {
1339 id
1340 ...userFields
1341 },
1342 another_user: userById(id: 2) {
1343 name
1344 email
1345 }
1346 }
1347 fragment userFields on User {
1348 name
1349 email
1350 }
1351 "#,
1352 "operation.graphql",
1353 )
1354 .unwrap();
1355
1356 let config = QueryPlannerConfig {
1357 generate_query_fragments: true,
1358 ..Default::default()
1359 };
1360 let planner = QueryPlanner::new(&supergraph, config).unwrap();
1361 let plan = planner
1362 .build_query_plan(&document, None, Default::default())
1363 .unwrap();
1364 insta::assert_snapshot!(plan, @r###"
1365 QueryPlan {
1366 Fetch(service: "accounts") {
1367 {
1368 userById(id: 1) {
1369 id
1370 name
1371 email
1372 }
1373 another_user: userById(id: 2) {
1374 name
1375 email
1376 }
1377 }
1378 },
1379 }
1380 "###);
1381 }
1382
1383 #[test]
1384 fn drop_operation_root_level_typename() {
1385 let supergraph = Supergraph::new(TEST_SUPERGRAPH).unwrap();
1386 let planner = QueryPlanner::new(&supergraph, Default::default()).unwrap();
1387
1388 let document = ExecutableDocument::parse_and_validate(
1389 planner.api_schema().schema(),
1390 r#"
1391 {
1392 __typename
1393 bestRatedProducts {
1394 id
1395 }
1396 }
1397 "#,
1398 "operation.graphql",
1399 )
1400 .unwrap();
1401 let plan = planner
1402 .build_query_plan(&document, None, Default::default())
1403 .unwrap();
1404 insta::assert_snapshot!(plan, @r###"
1406 QueryPlan {
1407 Fetch(service: "reviews") {
1408 {
1409 bestRatedProducts {
1410 __typename
1411 id
1412 }
1413 }
1414 },
1415 }
1416 "###);
1417 }
1418}