use std::cell::Cell;
use std::num::NonZeroU32;
use std::ops::ControlFlow;
use std::sync::Arc;
use apollo_compiler::ExecutableDocument;
use apollo_compiler::Name;
use apollo_compiler::collections::IndexMap;
use apollo_compiler::collections::IndexSet;
use apollo_compiler::validation::Valid;
use itertools::Itertools;
use petgraph::visit::EdgeRef;
use serde::Deserialize;
use serde::Serialize;
use tracing::trace;
use super::ConditionNode;
use super::QueryPlanCost;
use super::fetch_dependency_graph::FetchIdGenerator;
use crate::ApiSchemaOptions;
use crate::Supergraph;
use crate::bail;
use crate::error::FederationError;
use crate::error::SingleFederationError;
use crate::internal_error;
use crate::operation::NormalizedDefer;
use crate::operation::Operation;
use crate::operation::SelectionSet;
use crate::operation::normalize_operation;
use crate::query_graph::OverrideConditions;
use crate::query_graph::QueryGraph;
use crate::query_graph::QueryGraphNodeType;
use crate::query_graph::build_federated_query_graph;
use crate::query_graph::path_tree::OpPathTree;
use crate::query_plan::PlanNode;
use crate::query_plan::QueryPlan;
use crate::query_plan::SequenceNode;
use crate::query_plan::TopLevelPlanNode;
use crate::query_plan::fetch_dependency_graph::FetchDependencyGraph;
use crate::query_plan::fetch_dependency_graph::FetchDependencyGraphNodePath;
use crate::query_plan::fetch_dependency_graph::compute_nodes_for_tree;
use crate::query_plan::fetch_dependency_graph_processor::FetchDependencyGraphProcessor;
use crate::query_plan::fetch_dependency_graph_processor::FetchDependencyGraphToCostProcessor;
use crate::query_plan::fetch_dependency_graph_processor::FetchDependencyGraphToQueryPlanProcessor;
use crate::query_plan::query_planning_traversal::BestQueryPlanInfo;
use crate::query_plan::query_planning_traversal::QueryPlanningParameters;
use crate::query_plan::query_planning_traversal::QueryPlanningTraversal;
use crate::query_plan::query_planning_traversal::convert_type_from_subgraph;
use crate::query_plan::query_planning_traversal::non_local_selections_estimation;
use crate::schema::ValidFederationSchema;
use crate::schema::position::AbstractTypeDefinitionPosition;
use crate::schema::position::CompositeTypeDefinitionPosition;
use crate::schema::position::InterfaceTypeDefinitionPosition;
use crate::schema::position::ObjectTypeDefinitionPosition;
use crate::schema::position::OutputTypeDefinitionPosition;
use crate::schema::position::SchemaRootDefinitionKind;
use crate::schema::position::TypeDefinitionPosition;
use crate::utils::logging::snapshot;
#[derive(Debug, Clone, Hash, Serialize)]
pub struct QueryPlannerConfig {
pub generate_query_fragments: bool,
pub subgraph_graphql_validation: bool,
pub incremental_delivery: QueryPlanIncrementalDeliveryConfig,
pub debug: QueryPlannerDebugConfig,
pub type_conditioned_fetching: bool,
}
#[allow(clippy::derivable_impls)] impl Default for QueryPlannerConfig {
fn default() -> Self {
Self {
generate_query_fragments: false,
subgraph_graphql_validation: false,
incremental_delivery: Default::default(),
debug: Default::default(),
type_conditioned_fetching: false,
}
}
}
#[derive(Debug, Clone, Default, Hash, Serialize)]
pub struct QueryPlanIncrementalDeliveryConfig {
#[serde(default)]
pub enable_defer: bool,
}
#[derive(Debug, Clone, Hash, Serialize)]
pub struct QueryPlannerDebugConfig {
pub max_evaluated_plans: NonZeroU32,
pub paths_limit: Option<u32>,
}
impl Default for QueryPlannerDebugConfig {
fn default() -> Self {
Self {
max_evaluated_plans: NonZeroU32::new(10_000).unwrap(),
paths_limit: None,
}
}
}
#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize)]
pub struct QueryPlanningStatistics {
pub evaluated_plan_count: Cell<usize>,
pub evaluated_plan_paths: Cell<usize>,
#[serde(deserialize_with = "deserialize_f64_nullable")]
pub best_plan_cost: f64,
}
fn deserialize_f64_nullable<'de, D>(deserializer: D) -> Result<f64, D::Error>
where
D: serde::de::Deserializer<'de>,
{
let opt = Option::<f64>::deserialize(deserializer)?;
Ok(opt.unwrap_or(f64::NAN))
}
#[derive(Clone)]
pub struct QueryPlanOptions<'a> {
pub override_conditions: Vec<String>,
pub check_for_cooperative_cancellation: Option<&'a dyn Fn() -> ControlFlow<()>>,
pub non_local_selections_limit_enabled: bool,
pub disabled_subgraph_names: IndexSet<String>,
}
impl Default for QueryPlanOptions<'_> {
fn default() -> Self {
Self {
override_conditions: Vec::new(),
check_for_cooperative_cancellation: None,
non_local_selections_limit_enabled: true,
disabled_subgraph_names: Default::default(),
}
}
}
impl std::fmt::Debug for QueryPlanOptions<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("QueryPlanOptions")
.field("override_conditions", &self.override_conditions)
.field(
"check_for_cooperative_cancellation",
if self.check_for_cooperative_cancellation.is_some() {
&"Some(...)"
} else {
&"None"
},
)
.field(
"non_local_selections_limit_enabled",
&self.non_local_selections_limit_enabled,
)
.finish()
}
}
pub struct QueryPlanner {
config: QueryPlannerConfig,
federated_query_graph: Arc<QueryGraph>,
supergraph_schema: ValidFederationSchema,
api_schema: ValidFederationSchema,
interface_types_with_interface_objects: IndexSet<InterfaceTypeDefinitionPosition>,
abstract_types_with_inconsistent_runtime_types: IndexSet<Name>,
}
impl QueryPlanner {
#[cfg_attr(
feature = "snapshot_tracing",
tracing::instrument(level = "trace", skip_all, name = "QueryPlanner::new")
)]
pub fn new(
supergraph: &Supergraph,
config: QueryPlannerConfig,
) -> Result<Self, FederationError> {
let supergraph_schema = supergraph.schema.clone();
let api_schema = supergraph.to_api_schema(ApiSchemaOptions {
include_defer: config.incremental_delivery.enable_defer,
..Default::default()
})?;
let query_graph = build_federated_query_graph(
supergraph_schema.clone(),
api_schema.clone(),
Some(true),
Some(true),
)?;
let interface_types_with_interface_objects = supergraph
.schema
.get_types()
.filter_map(|position| match position {
TypeDefinitionPosition::Interface(interface_position) => Some(interface_position),
_ => None,
})
.map(|position| {
let is_interface_object = query_graph
.subgraphs()
.map(|(_name, schema)| {
let Some(position) = schema.try_get_type(position.type_name.clone()) else {
return Ok(false);
};
schema.is_interface_object_type(position)
})
.process_results(|mut iter| iter.any(|b| b))?;
Ok::<_, FederationError>((position, is_interface_object))
})
.process_results(|iter| {
iter.flat_map(|(position, is_interface_object)| {
if is_interface_object {
Some(position)
} else {
None
}
})
.collect::<IndexSet<_>>()
})?;
let is_inconsistent = |position: AbstractTypeDefinitionPosition| {
let mut sources = query_graph.subgraphs().filter_map(|(_name, subgraph)| {
match subgraph.try_get_type(position.type_name().clone())? {
TypeDefinitionPosition::Object(_) => None,
TypeDefinitionPosition::Interface(interface) => Some(
subgraph
.referencers()
.get_interface_type(&interface.type_name)
.ok()?
.object_types
.clone(),
),
TypeDefinitionPosition::Union(union_) => Some(
union_
.try_get(subgraph.schema())?
.members
.iter()
.map(|member| ObjectTypeDefinitionPosition::new(member.name.clone()))
.collect(),
),
_ => None,
}
});
let Some(expected_runtimes) = sources.next() else {
return false;
};
!sources.all(|runtimes| runtimes == expected_runtimes)
};
let abstract_types_with_inconsistent_runtime_types = supergraph
.schema
.get_types()
.filter_map(|position| AbstractTypeDefinitionPosition::try_from(position).ok())
.filter(|position| is_inconsistent(position.clone()))
.map(|position| position.type_name().clone())
.collect::<IndexSet<_>>();
Ok(Self {
config,
federated_query_graph: Arc::new(query_graph),
supergraph_schema,
api_schema,
interface_types_with_interface_objects,
abstract_types_with_inconsistent_runtime_types,
})
}
pub fn subgraph_schemas(&self) -> &IndexMap<Arc<str>, ValidFederationSchema> {
self.federated_query_graph.subgraph_schemas()
}
#[cfg_attr(
feature = "snapshot_tracing",
tracing::instrument(level = "trace", skip_all, name = "QueryPlanner::build_query_plan")
)]
pub fn build_query_plan(
&self,
document: &Valid<ExecutableDocument>,
operation_name: Option<Name>,
options: QueryPlanOptions,
) -> Result<QueryPlan, FederationError> {
let operation = document
.operations
.get(operation_name.as_ref().map(|name| name.as_str()))
.map_err(|_| {
if operation_name.is_some() {
SingleFederationError::UnknownOperation
} else {
SingleFederationError::OperationNameNotProvided
}
})?;
if operation.selection_set.is_empty() {
crate::bail!("Invalid operation: empty selection set")
}
let is_subscription = operation.is_subscription();
let statistics = QueryPlanningStatistics::default();
let normalized_operation = normalize_operation(
operation,
&document.fragments,
&self.api_schema,
&self.interface_types_with_interface_objects,
&|| {
QueryPlanningParameters::check_cancellation_with(
&options.check_for_cooperative_cancellation,
)
},
)?;
let NormalizedDefer {
operation: normalized_operation,
assigned_defer_labels,
defer_conditions,
has_defers,
} = normalized_operation.with_normalized_defer()?;
if has_defers && is_subscription {
return Err(SingleFederationError::DeferredSubscriptionUnsupported.into());
}
if normalized_operation.selection_set.is_empty() {
return Ok(QueryPlan::default());
}
snapshot!(
"NormalizedOperation",
serde_json_bytes::json!({
"original": &operation.serialize().to_string(),
"normalized": &normalized_operation.to_string()
})
.to_string(),
"normalized operation"
);
let Some(root) = self
.federated_query_graph
.root_kinds_to_nodes()?
.get(&normalized_operation.root_kind)
else {
bail!(
"Shouldn't have a {0} operation if the subgraphs don't have a {0} root",
normalized_operation.root_kind
)
};
let operation_compression = if self.config.generate_query_fragments {
SubgraphOperationCompression::GenerateFragments
} else {
SubgraphOperationCompression::Disabled
};
let mut processor = FetchDependencyGraphToQueryPlanProcessor::new(
normalized_operation.variables.clone(),
normalized_operation.directives.clone(),
operation_compression,
operation.name.clone(),
assigned_defer_labels,
);
let mut parameters = QueryPlanningParameters {
supergraph_schema: self.supergraph_schema.clone(),
federated_query_graph: self.federated_query_graph.clone(),
operation: Arc::new(normalized_operation),
head: *root,
head_must_be_root: true,
statistics: &statistics,
abstract_types_with_inconsistent_runtime_types: self
.abstract_types_with_inconsistent_runtime_types
.clone()
.into(),
config: self.config.clone(),
override_conditions: OverrideConditions::new(
&self.federated_query_graph,
&IndexSet::from_iter(options.override_conditions),
),
check_for_cooperative_cancellation: options.check_for_cooperative_cancellation,
fetch_id_generator: Arc::new(FetchIdGenerator::new()),
disabled_subgraphs: self
.federated_query_graph
.subgraphs()
.filter_map(|(subgraph, _)| {
if options.disabled_subgraph_names.contains(subgraph.as_ref()) {
Some(subgraph.clone())
} else {
None
}
})
.collect(),
};
let mut non_local_selection_state = options
.non_local_selections_limit_enabled
.then(non_local_selections_estimation::State::default);
let (root_node, cost) = if !defer_conditions.is_empty() {
compute_plan_for_defer_conditionals(
&mut parameters,
&mut processor,
defer_conditions,
&mut non_local_selection_state,
)
} else {
compute_plan_internal(
&mut parameters,
&mut processor,
has_defers,
&mut non_local_selection_state,
)
}?;
let root_node = match root_node {
Some(PlanNode::Fetch(root_node)) if is_subscription => Some(
TopLevelPlanNode::Subscription(crate::query_plan::SubscriptionNode {
primary: root_node,
rest: None,
}),
),
Some(PlanNode::Sequence(root_node)) if is_subscription => {
let Some((primary, rest)) = root_node.nodes.split_first() else {
bail!("Invalid query plan: Sequence must have at least one node");
};
let PlanNode::Fetch(primary) = primary.clone() else {
bail!("Invalid query plan: Primary node of a subscription is not a Fetch");
};
let rest = PlanNode::Sequence(SequenceNode {
nodes: rest.to_vec(),
});
Some(TopLevelPlanNode::Subscription(
crate::query_plan::SubscriptionNode {
primary,
rest: Some(Box::new(rest)),
},
))
}
Some(node) if is_subscription => {
bail!(
"Invalid query plan for subscription: unexpected {} at root",
node.node_kind()
);
}
Some(PlanNode::Fetch(inner)) => Some(TopLevelPlanNode::Fetch(inner)),
Some(PlanNode::Sequence(inner)) => Some(TopLevelPlanNode::Sequence(inner)),
Some(PlanNode::Parallel(inner)) => Some(TopLevelPlanNode::Parallel(inner)),
Some(PlanNode::Flatten(inner)) => Some(TopLevelPlanNode::Flatten(inner)),
Some(PlanNode::Defer(inner)) => Some(TopLevelPlanNode::Defer(inner)),
Some(PlanNode::Condition(inner)) => Some(TopLevelPlanNode::Condition(inner)),
None => None,
};
let plan = QueryPlan {
node: root_node,
statistics: QueryPlanningStatistics {
best_plan_cost: cost,
..statistics
},
};
snapshot!(
"QueryPlan",
plan.to_string(),
"QueryPlan from build_query_plan"
);
snapshot!(
plan.statistics,
"QueryPlanningStatistics from build_query_plan"
);
Ok(plan)
}
pub fn api_schema(&self) -> &ValidFederationSchema {
&self.api_schema
}
pub fn supergraph_schema(&self) -> &ValidFederationSchema {
&self.supergraph_schema
}
pub fn override_condition_labels(&self) -> &IndexSet<Arc<str>> {
self.federated_query_graph.override_condition_labels()
}
}
fn compute_root_serial_dependency_graph_for_mutation(
parameters: &QueryPlanningParameters,
has_defers: bool,
non_local_selection_state: &mut Option<non_local_selections_estimation::State>,
) -> Result<Vec<FetchDependencyGraph>, FederationError> {
let QueryPlanningParameters {
supergraph_schema,
federated_query_graph,
operation,
..
} = parameters;
let root_type: Option<CompositeTypeDefinitionPosition> = if has_defers {
supergraph_schema
.schema()
.root_operation(operation.root_kind.into())
.and_then(|name| supergraph_schema.get_type(name.clone()).ok())
.and_then(|ty| ty.try_into().ok())
} else {
None
};
let mut split_roots = operation.selection_set.clone().split_top_level_fields();
let mut digest = Vec::new();
let selection_set = split_roots
.next()
.ok_or_else(|| FederationError::internal("Empty top level fields"))?;
let BestQueryPlanInfo {
mut fetch_dependency_graph,
path_tree: mut prev_path,
..
} = compute_root_parallel_best_plan_for_mutation(
parameters,
selection_set,
has_defers,
non_local_selection_state,
)?;
let mut prev_subgraph = only_root_subgraph(&fetch_dependency_graph)?;
for selection_set in split_roots {
let BestQueryPlanInfo {
fetch_dependency_graph: new_dep_graph,
path_tree: new_path,
..
} = compute_root_parallel_best_plan_for_mutation(
parameters,
selection_set,
has_defers,
non_local_selection_state,
)?;
let new_subgraph = only_root_subgraph(&new_dep_graph)?;
if new_subgraph == prev_subgraph {
Arc::make_mut(&mut prev_path).extend(&new_path);
fetch_dependency_graph = FetchDependencyGraph::new(
supergraph_schema.clone(),
federated_query_graph.clone(),
root_type.clone(),
fetch_dependency_graph.fetch_id_generation.clone(),
);
compute_root_fetch_groups(
operation.root_kind,
federated_query_graph,
&mut fetch_dependency_graph,
&prev_path,
parameters.config.type_conditioned_fetching,
&|| parameters.check_cancellation(),
)?;
} else {
digest.push(std::mem::replace(
&mut fetch_dependency_graph,
new_dep_graph,
));
prev_path = new_path;
prev_subgraph = new_subgraph;
}
}
digest.push(fetch_dependency_graph);
Ok(digest)
}
fn only_root_subgraph(graph: &FetchDependencyGraph) -> Result<Arc<str>, FederationError> {
let mut iter = graph.root_node_by_subgraph_iter();
let (Some((name, _)), None) = (iter.next(), iter.next()) else {
return Err(FederationError::internal(format!(
"{graph} should have only one root."
)));
};
Ok(name.clone())
}
#[cfg_attr(
feature = "snapshot_tracing",
tracing::instrument(level = "trace", skip_all, name = "compute_root_fetch_groups")
)]
pub(crate) fn compute_root_fetch_groups(
root_kind: SchemaRootDefinitionKind,
federated_query_graph: &QueryGraph,
dependency_graph: &mut FetchDependencyGraph,
path: &OpPathTree,
type_conditioned_fetching_enabled: bool,
check_cancellation: &dyn Fn() -> Result<(), SingleFederationError>,
) -> Result<(), FederationError> {
for child in &path.childs {
let edge = child.edge.expect("The root edge should not be None");
let (_source_node, target_node) = path.graph.edge_endpoints(edge)?;
let target_node = path.graph.node_weight(target_node)?;
let subgraph_name = &target_node.source;
let root_type: CompositeTypeDefinitionPosition = match &target_node.type_ {
QueryGraphNodeType::SchemaType(OutputTypeDefinitionPosition::Object(object)) => {
object.clone().into()
}
ty => {
return Err(FederationError::internal(format!(
"expected an object type for the root of a subgraph, found {ty}"
)));
}
};
let fetch_dependency_node = dependency_graph.get_or_create_root_node(
subgraph_name,
root_kind,
root_type.clone(),
)?;
snapshot!(
"FetchDependencyGraph",
dependency_graph.to_dot(),
"tree_with_root_node"
);
let subgraph_schema = federated_query_graph.schema_by_source(subgraph_name)?;
let supergraph_root_type = convert_type_from_subgraph(
root_type,
subgraph_schema,
&dependency_graph.supergraph_schema,
)?;
compute_nodes_for_tree(
dependency_graph,
&child.tree,
fetch_dependency_node,
FetchDependencyGraphNodePath::new(
dependency_graph.supergraph_schema.clone(),
type_conditioned_fetching_enabled,
supergraph_root_type,
)?,
Default::default(),
&Default::default(),
check_cancellation,
)?;
}
Ok(())
}
fn compute_root_parallel_dependency_graph(
parameters: &QueryPlanningParameters,
has_defers: bool,
non_local_selection_state: &mut Option<non_local_selections_estimation::State>,
) -> Result<(FetchDependencyGraph, QueryPlanCost), FederationError> {
trace!("Starting process to construct a parallel fetch dependency graph");
let selection_set = parameters.operation.selection_set.clone();
let best_plan = compute_root_parallel_best_plan(
parameters,
selection_set,
has_defers,
non_local_selection_state,
)?;
snapshot!(
"FetchDependencyGraph",
best_plan.fetch_dependency_graph.to_dot(),
"Fetch dependency graph returned from compute_root_parallel_best_plan"
);
Ok((best_plan.fetch_dependency_graph, best_plan.cost))
}
fn compute_root_parallel_best_plan(
parameters: &QueryPlanningParameters,
selection: SelectionSet,
has_defers: bool,
non_local_selection_state: &mut Option<non_local_selections_estimation::State>,
) -> Result<BestQueryPlanInfo, FederationError> {
let planning_traversal = QueryPlanningTraversal::new(
parameters,
selection,
has_defers,
parameters.operation.root_kind,
FetchDependencyGraphToCostProcessor,
non_local_selection_state.as_mut(),
None,
)?;
Ok(planning_traversal
.find_best_plan()?
.unwrap_or_else(|| BestQueryPlanInfo::empty(parameters)))
}
fn compute_root_parallel_best_plan_for_mutation(
parameters: &QueryPlanningParameters,
selection: SelectionSet,
has_defers: bool,
non_local_selection_state: &mut Option<non_local_selections_estimation::State>,
) -> Result<BestQueryPlanInfo, FederationError> {
parameters.federated_query_graph.out_edges(parameters.head).into_iter().map(|edge_ref| {
let mutation_subgraph = parameters.federated_query_graph.node_weight(edge_ref.target())?.source.clone();
let planning_traversal = QueryPlanningTraversal::new(
parameters,
selection.clone(),
has_defers,
parameters.operation.root_kind,
FetchDependencyGraphToCostProcessor,
non_local_selection_state.as_mut(),
Some(mutation_subgraph),
)?;
planning_traversal.find_best_plan()
}).process_results(|iter| iter
.flatten()
.min_by(|a, b| a.cost.total_cmp(&b.cost))
.map(Ok)
.unwrap_or_else(|| {
if parameters.disabled_subgraphs.is_empty() {
Err(FederationError::internal(format!(
"Was not able to plan {} starting from a single subgraph: This shouldn't have happened.",
parameters.operation,
)))
} else {
Err(SingleFederationError::NoPlanFoundWithDisabledSubgraphs.into())
}
})
)?
}
fn compute_plan_internal(
parameters: &mut QueryPlanningParameters,
processor: &mut FetchDependencyGraphToQueryPlanProcessor,
has_defers: bool,
non_local_selection_state: &mut Option<non_local_selections_estimation::State>,
) -> Result<(Option<PlanNode>, QueryPlanCost), FederationError> {
let root_kind = parameters.operation.root_kind;
let (main, deferred, primary_selection, cost) = if root_kind
== SchemaRootDefinitionKind::Mutation
{
let dependency_graphs = compute_root_serial_dependency_graph_for_mutation(
parameters,
has_defers,
non_local_selection_state,
)?;
let mut main = None;
let mut deferred = vec![];
let mut primary_selection = None::<SelectionSet>;
for mut dependency_graph in dependency_graphs {
let (local_main, local_deferred) =
dependency_graph.process(&mut *processor, root_kind)?;
main = match main {
Some(unlocal_main) => processor.reduce_sequence([Some(unlocal_main), local_main]),
None => local_main,
};
deferred.extend(local_deferred);
let new_selection = dependency_graph.defer_tracking.primary_selection;
match primary_selection.as_mut() {
Some(selection) => {
if let Some(new_selection) = new_selection {
selection.add_local_selection_set(&new_selection)?
}
}
None => primary_selection = new_selection,
}
}
(main, deferred, primary_selection, f64::NAN)
} else {
let (mut dependency_graph, cost) = compute_root_parallel_dependency_graph(
parameters,
has_defers,
non_local_selection_state,
)?;
let (main, deferred) = dependency_graph.process(&mut *processor, root_kind)?;
snapshot!(
"FetchDependencyGraph",
dependency_graph.to_dot(),
"Plan after calling FetchDependencyGraph::process"
);
let primary_selection = dependency_graph.defer_tracking.primary_selection;
(main, deferred, primary_selection, cost)
};
if deferred.is_empty() {
Ok((main, cost))
} else {
let Some(primary_selection) = primary_selection else {
unreachable!("Should have had a primary selection created");
};
let reduced_main = processor.reduce_defer(main, &primary_selection, deferred)?;
Ok((reduced_main, cost))
}
}
fn compute_plan_for_defer_conditionals(
parameters: &mut QueryPlanningParameters,
processor: &mut FetchDependencyGraphToQueryPlanProcessor,
defer_conditions: IndexMap<Name, IndexSet<String>>,
non_local_selection_state: &mut Option<non_local_selections_estimation::State>,
) -> Result<(Option<PlanNode>, QueryPlanCost), FederationError> {
generate_condition_nodes(
parameters.operation.clone(),
defer_conditions.iter(),
&mut |op| {
parameters.operation = op;
compute_plan_internal(parameters, processor, true, non_local_selection_state)
},
)
}
fn generate_condition_nodes<'a>(
op: Arc<Operation>,
mut conditions: impl Clone + Iterator<Item = (&'a Name, &'a IndexSet<String>)>,
on_final_operation: &mut impl FnMut(
Arc<Operation>,
) -> Result<(Option<PlanNode>, f64), FederationError>,
) -> Result<(Option<PlanNode>, f64), FederationError> {
match conditions.next() {
None => on_final_operation(op),
Some((cond, labels)) => {
let else_op = Arc::unwrap_or_clone(op.clone()).reduce_defer(labels)?;
let if_op = op;
let (if_node, if_cost) =
generate_condition_nodes(if_op, conditions.clone(), on_final_operation)?;
let (else_node, else_cost) = generate_condition_nodes(
Arc::new(else_op),
conditions.clone(),
on_final_operation,
)?;
let node = ConditionNode {
condition_variable: cond.clone(),
if_clause: if_node.map(Box::new),
else_clause: else_node.map(Box::new),
};
Ok((
Some(PlanNode::Condition(Box::new(node))),
if_cost.max(else_cost),
))
}
}
}
pub(crate) enum SubgraphOperationCompression {
GenerateFragments,
Disabled,
}
impl SubgraphOperationCompression {
pub(crate) fn compress(
&mut self,
operation: Operation,
) -> Result<Valid<ExecutableDocument>, FederationError> {
match self {
Self::GenerateFragments => Ok(operation.generate_fragments()?),
Self::Disabled => {
let operation_document = operation.try_into().map_err(|err: FederationError| {
if err.has_invalid_graphql_error() {
internal_error!(
"Query planning produced an invalid subgraph operation.\n{err}"
)
} else {
err
}
})?;
Ok(operation_document)
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
const TEST_SUPERGRAPH: &str = r#"
schema
@link(url: "https://specs.apollo.dev/link/v1.0")
@link(url: "https://specs.apollo.dev/join/v0.2", for: EXECUTION)
{
query: Query
}
directive @join__field(graph: join__Graph!, requires: join__FieldSet, provides: join__FieldSet, type: String, external: Boolean, override: String, usedOverridden: Boolean) repeatable on FIELD_DEFINITION | INPUT_FIELD_DEFINITION
directive @join__graph(name: String!, url: String!) on ENUM_VALUE
directive @join__implements(graph: join__Graph!, interface: String!) repeatable on OBJECT | INTERFACE
directive @join__type(graph: join__Graph!, key: join__FieldSet, extension: Boolean! = false, resolvable: Boolean! = true) repeatable on OBJECT | INTERFACE | UNION | ENUM | INPUT_OBJECT | SCALAR
directive @link(url: String, as: String, for: link__Purpose, import: [link__Import]) repeatable on SCHEMA
type Book implements Product
@join__implements(graph: PRODUCTS, interface: "Product")
@join__implements(graph: REVIEWS, interface: "Product")
@join__type(graph: PRODUCTS, key: "id")
@join__type(graph: REVIEWS, key: "id")
{
id: ID!
price: Price @join__field(graph: PRODUCTS)
title: String @join__field(graph: PRODUCTS)
vendor: User @join__field(graph: PRODUCTS)
pages: Int @join__field(graph: PRODUCTS)
avg_rating: Int @join__field(graph: PRODUCTS, requires: "reviews { rating }")
reviews: [Review] @join__field(graph: PRODUCTS, external: true) @join__field(graph: REVIEWS)
}
enum Currency
@join__type(graph: PRODUCTS)
{
USD
EUR
}
scalar join__FieldSet
enum join__Graph {
ACCOUNTS @join__graph(name: "accounts", url: "")
PRODUCTS @join__graph(name: "products", url: "")
REVIEWS @join__graph(name: "reviews", url: "")
}
scalar link__Import
enum link__Purpose {
"""
`SECURITY` features provide metadata necessary to securely resolve fields.
"""
SECURITY
"""
`EXECUTION` features provide metadata necessary for operation execution.
"""
EXECUTION
}
type Movie implements Product
@join__implements(graph: PRODUCTS, interface: "Product")
@join__implements(graph: REVIEWS, interface: "Product")
@join__type(graph: PRODUCTS, key: "id")
@join__type(graph: REVIEWS, key: "id")
{
id: ID!
price: Price @join__field(graph: PRODUCTS)
title: String @join__field(graph: PRODUCTS)
vendor: User @join__field(graph: PRODUCTS)
length_minutes: Int @join__field(graph: PRODUCTS)
avg_rating: Int @join__field(graph: PRODUCTS, requires: "reviews { rating }")
reviews: [Review] @join__field(graph: PRODUCTS, external: true) @join__field(graph: REVIEWS)
}
type Price
@join__type(graph: PRODUCTS)
{
value: Int
currency: Currency
}
interface Product
@join__type(graph: PRODUCTS)
@join__type(graph: REVIEWS)
{
id: ID!
price: Price @join__field(graph: PRODUCTS)
vendor: User @join__field(graph: PRODUCTS)
avg_rating: Int @join__field(graph: PRODUCTS)
reviews: [Review] @join__field(graph: REVIEWS)
}
type Query
@join__type(graph: ACCOUNTS)
@join__type(graph: PRODUCTS)
@join__type(graph: REVIEWS)
{
userById(id: ID!): User @join__field(graph: ACCOUNTS)
me: User! @join__field(graph: ACCOUNTS) @join__field(graph: REVIEWS)
productById(id: ID!): Product @join__field(graph: PRODUCTS)
search(filter: SearchFilter): [Product] @join__field(graph: PRODUCTS)
bestRatedProducts(limit: Int): [Product] @join__field(graph: REVIEWS)
}
type Review
@join__type(graph: PRODUCTS)
@join__type(graph: REVIEWS)
{
rating: Int @join__field(graph: PRODUCTS, external: true) @join__field(graph: REVIEWS)
product: Product @join__field(graph: REVIEWS)
author: User @join__field(graph: REVIEWS)
text: String @join__field(graph: REVIEWS)
}
input SearchFilter
@join__type(graph: PRODUCTS)
{
pattern: String!
vendorName: String
}
type User
@join__type(graph: ACCOUNTS, key: "id")
@join__type(graph: PRODUCTS, key: "id", resolvable: false)
@join__type(graph: REVIEWS, key: "id")
{
id: ID!
name: String @join__field(graph: ACCOUNTS)
email: String @join__field(graph: ACCOUNTS)
password: String @join__field(graph: ACCOUNTS)
nickname: String @join__field(graph: ACCOUNTS, override: "reviews")
reviews: [Review] @join__field(graph: REVIEWS)
}
"#;
#[test]
fn plan_simple_query_for_single_subgraph() {
let supergraph = Supergraph::new(TEST_SUPERGRAPH).unwrap();
let planner = QueryPlanner::new(&supergraph, Default::default()).unwrap();
let document = ExecutableDocument::parse_and_validate(
planner.api_schema().schema(),
r#"
{
userById(id: 1) {
name
email
}
}
"#,
"operation.graphql",
)
.unwrap();
let plan = planner
.build_query_plan(&document, None, Default::default())
.unwrap();
insta::assert_snapshot!(plan, @r###"
QueryPlan {
Fetch(service: "accounts") {
{
userById(id: 1) {
name
email
}
}
},
}
"###);
}
#[test]
fn plan_simple_query_for_multiple_subgraphs() {
let supergraph = Supergraph::new(TEST_SUPERGRAPH).unwrap();
let planner = QueryPlanner::new(&supergraph, Default::default()).unwrap();
let document = ExecutableDocument::parse_and_validate(
planner.api_schema().schema(),
r#"
{
bestRatedProducts {
vendor { name }
}
}
"#,
"operation.graphql",
)
.unwrap();
let plan = planner
.build_query_plan(&document, None, Default::default())
.unwrap();
insta::assert_snapshot!(plan, @r###"
QueryPlan {
Sequence {
Fetch(service: "reviews") {
{
bestRatedProducts {
__typename
... on Book {
__typename
id
}
... on Movie {
__typename
id
}
}
}
},
Flatten(path: "bestRatedProducts.@") {
Fetch(service: "products") {
{
... on Book {
__typename
id
}
... on Movie {
__typename
id
}
} =>
{
... on Book {
vendor {
__typename
id
}
}
... on Movie {
vendor {
__typename
id
}
}
}
},
},
Flatten(path: "bestRatedProducts.@.vendor") {
Fetch(service: "accounts") {
{
... on User {
__typename
id
}
} =>
{
... on User {
name
}
}
},
},
},
}
"###);
}
#[test]
fn plan_simple_root_field_query_for_multiple_subgraphs() {
let supergraph = Supergraph::new(TEST_SUPERGRAPH).unwrap();
let planner = QueryPlanner::new(&supergraph, Default::default()).unwrap();
let document = ExecutableDocument::parse_and_validate(
planner.api_schema().schema(),
r#"
{
userById(id: 1) {
name
email
}
bestRatedProducts {
id
avg_rating
}
}
"#,
"operation.graphql",
)
.unwrap();
let plan = planner
.build_query_plan(&document, None, Default::default())
.unwrap();
insta::assert_snapshot!(plan, @r###"
QueryPlan {
Parallel {
Fetch(service: "accounts") {
{
userById(id: 1) {
name
email
}
}
},
Sequence {
Fetch(service: "reviews") {
{
bestRatedProducts {
__typename
id
... on Book {
__typename
id
reviews {
rating
}
}
... on Movie {
__typename
id
reviews {
rating
}
}
}
}
},
Flatten(path: "bestRatedProducts.@") {
Fetch(service: "products") {
{
... on Book {
__typename
id
reviews {
rating
}
}
... on Movie {
__typename
id
reviews {
rating
}
}
} =>
{
... on Book {
avg_rating
}
... on Movie {
avg_rating
}
}
},
},
},
},
}
"###);
}
#[test]
fn test_optimize_no_fragments_generated() {
let supergraph = Supergraph::new(TEST_SUPERGRAPH).unwrap();
let api_schema = supergraph.to_api_schema(Default::default()).unwrap();
let document = ExecutableDocument::parse_and_validate(
api_schema.schema(),
r#"
{
userById(id: 1) {
id
...userFields
},
another_user: userById(id: 2) {
name
email
}
}
fragment userFields on User {
name
email
}
"#,
"operation.graphql",
)
.unwrap();
let config = QueryPlannerConfig {
generate_query_fragments: true,
..Default::default()
};
let planner = QueryPlanner::new(&supergraph, config).unwrap();
let plan = planner
.build_query_plan(&document, None, Default::default())
.unwrap();
insta::assert_snapshot!(plan, @r###"
QueryPlan {
Fetch(service: "accounts") {
{
userById(id: 1) {
id
name
email
}
another_user: userById(id: 2) {
name
email
}
}
},
}
"###);
}
#[test]
fn drop_operation_root_level_typename() {
let supergraph = Supergraph::new(TEST_SUPERGRAPH).unwrap();
let planner = QueryPlanner::new(&supergraph, Default::default()).unwrap();
let document = ExecutableDocument::parse_and_validate(
planner.api_schema().schema(),
r#"
{
__typename
bestRatedProducts {
id
}
}
"#,
"operation.graphql",
)
.unwrap();
let plan = planner
.build_query_plan(&document, None, Default::default())
.unwrap();
insta::assert_snapshot!(plan, @r###"
QueryPlan {
Fetch(service: "reviews") {
{
bestRatedProducts {
__typename
id
}
}
},
}
"###);
}
#[test]
fn test_query_plan_statistics_nan_cost() {
let stats = QueryPlanningStatistics {
evaluated_plan_count: Cell::new(10),
evaluated_plan_paths: Cell::new(20),
best_plan_cost: f64::NAN,
};
let serialized = serde_json::to_string_pretty(&stats).expect("Serializing");
insta::assert_snapshot!(serialized, @r###"
{
"evaluated_plan_count": 10,
"evaluated_plan_paths": 20,
"best_plan_cost": null
}
"###);
let deserialized: QueryPlanningStatistics =
serde_json::from_str(&serialized).expect("Deserializing");
assert!(deserialized.best_plan_cost.is_nan());
}
}