apollo-federation 2.16.0

Apollo Federation
Documentation
use apollo_compiler::Name;
use apollo_compiler::ast::DirectiveDefinition;
use apollo_compiler::collections::IndexMap;
use itertools::Itertools;
use tracing::trace;

use crate::bail;
use crate::error::CompositionError;
use crate::error::FederationError;
use crate::internal_error;
use crate::link::Import;
use crate::link::Link;
use crate::link::authenticated_spec_definition::AUTHENTICATED_DIRECTIVE_NAME_IN_SPEC;
use crate::link::metadata::LinkedElement;
use crate::link::policy_spec_definition::POLICY_DIRECTIVE_NAME_IN_SPEC;
use crate::link::requires_scopes_spec_definition::REQUIRES_SCOPES_DIRECTIVE_NAME_IN_SPEC;
use crate::link::spec::Identity;
use crate::link::spec::Url;
use crate::link::spec_definition::SpecDefinition;
use crate::link::spec_registry::SPEC_REGISTRY;
use crate::merger::merge::MergedDirectiveInfo;
use crate::merger::merge::Merger;
use crate::schema::type_and_directive_specification::DirectiveCompositionSpecification;

pub(crate) struct CoreDirectiveInSubgraphs {
    url: Url,
    name: Name,
    definitions_per_subgraph: IndexMap<String, DirectiveDefinition>,
    composition_spec: DirectiveCompositionSpecification,
}

impl std::fmt::Debug for CoreDirectiveInSubgraphs {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("CoreDirectiveInSubgraphs")
            .field("url", &self.url)
            .field("name", &self.name)
            .field("definitions_per_subgraph", &self.definitions_per_subgraph)
            .finish()
    }
}

pub(crate) type SupergraphInfoByIdentity = IndexMap<Identity, SupergraphInfo>;

pub(crate) struct SupergraphInfo {
    pub(crate) spec_in_supergraph: &'static dyn SpecDefinition,
    pub(crate) directives: Vec<SupergraphDirectiveInfo>,
}

pub(crate) struct SupergraphDirectiveInfo {
    pub(crate) name_in_feature: Name,
    pub(crate) name_in_supergraph: Name,
    pub(crate) composition_spec: DirectiveCompositionSpecification,
}

impl Merger {
    pub(crate) fn collect_core_directives_to_compose(
        &self,
    ) -> Result<Vec<CoreDirectiveInSubgraphs>, FederationError> {
        trace!("Collecting core directives used in subgraphs");
        // Groups directives by their feature and major version (we use negative numbers for
        // pre-1.0 version numbers on the minor, since all minors are incompatible).
        let mut directives_per_feature_and_version: IndexMap<
            String,
            IndexMap<i32, CoreDirectiveInSubgraphs>,
        > = IndexMap::default();

        for subgraph in &self.subgraphs {
            let Some(features) = subgraph.schema().metadata() else {
                bail!("Subgraphs should be core schemas")
            };

            for (directive, referencers) in &subgraph.schema().referencers().directives {
                let Some(LinkedElement {
                    link: source,
                    name_in_spec,
                }) = features.source_link_of_directive(directive)
                else {
                    continue;
                };
                let Some(name_in_spec) = name_in_spec else {
                    continue;
                };
                if referencers.is_empty() {
                    continue;
                }
                let Some(composition_spec) =
                    SPEC_REGISTRY.get_composition_spec(&source, &name_in_spec)
                else {
                    trace!(
                        "Directive @{directive} from {} has no registered composition spec, skipping",
                        source.url
                    );
                    continue;
                };
                let Some(definition) = subgraph
                    .schema()
                    .schema()
                    .directive_definitions
                    .get(directive)
                else {
                    bail!(
                        "Missing directive definition for @{directive} in {}",
                        source
                    )
                };

                let fqn = format!("{}-{}", name_in_spec, source.url.identity);
                let for_feature = directives_per_feature_and_version.entry(fqn).or_default();

                let major = if source.url.version.major > 0 {
                    source.url.version.major as i32
                } else {
                    -(source.url.version.minor as i32)
                };

                if let Some(for_version) = for_feature.get_mut(&major) {
                    if source.url.version > for_version.url.version {
                        for_version.url = source.url.clone();
                    }
                    for_version
                        .definitions_per_subgraph
                        .insert(subgraph.name.clone(), (**definition).clone());
                } else {
                    let mut definitions_per_subgraph = IndexMap::default();
                    definitions_per_subgraph.insert(subgraph.name.clone(), (**definition).clone());
                    let for_version = CoreDirectiveInSubgraphs {
                        url: source.url.clone(),
                        name: name_in_spec.clone(),
                        definitions_per_subgraph,
                        composition_spec,
                    };
                    for_feature.insert(major, for_version);
                }
            }
        }

        Ok(directives_per_feature_and_version
            .into_iter()
            .flat_map(|(_, values)| values.into_values())
            .collect())
    }

    pub(crate) fn validate_core_directive_info(
        &mut self,
        directives_merge_info: Vec<CoreDirectiveInSubgraphs>,
    ) -> Result<SupergraphInfoByIdentity, FederationError> {
        let mut supergraph_info_by_identity: IndexMap<Identity, SupergraphInfo> =
            Default::default();

        trace!("Determining supergraph names for directives used in subgraphs");
        for subgraph_core_directive in directives_merge_info {
            let mut name_in_supergraph: Option<&Name> = None;
            for subgraph in &self.subgraphs {
                let Some(directive) = subgraph_core_directive
                    .definitions_per_subgraph
                    .get(&subgraph.name)
                else {
                    continue;
                };

                if name_in_supergraph.is_none() {
                    name_in_supergraph = Some(&directive.name);
                } else if name_in_supergraph.is_some_and(|n| *n != directive.name) {
                    let definition_sources = self
                        .subgraphs
                        .iter()
                        .enumerate()
                        .map(|(idx, s)| {
                            (
                                idx,
                                subgraph_core_directive
                                    .definitions_per_subgraph
                                    .get(&s.name),
                            )
                        })
                        .collect();
                    self.error_reporter.report_mismatch_error(
                        CompositionError::LinkImportNameMismatch {
                            message: format!("The \"@{}\" directive (from {}) is imported with mismatched name between subgraphs: it is imported as ", directive.name, subgraph_core_directive.url),
                        },
                        &directive,
                        &definition_sources,
                        &self.subgraphs,
                        |def| Some(format!("\"@{}\"", def.name)),
                        |def, _| Some(format!("\"@{}\"", def.name)),
                    );
                    return Ok(supergraph_info_by_identity);
                }
            }

            // If we get here with `name_in_supergraph` unset, it means there is no usage for the
            // directive at all, and we don't bother adding the spec to the supergraph.
            let Some(name_in_supergraph) = name_in_supergraph else {
                trace!(
                    "Directive @{} is not used in any subgraph, skipping",
                    subgraph_core_directive.name
                );
                continue;
            };
            let Some(spec_in_supergraph) =
                (subgraph_core_directive
                    .composition_spec
                    .supergraph_specification)(&self.latest_federation_version_used)
            else {
                trace!(
                    "Directive @{name_in_supergraph} has no registered composition spec, skipping"
                );
                continue;
            };

            let supergraph_info = supergraph_info_by_identity
                .entry(spec_in_supergraph.identity().clone())
                .or_insert(SupergraphInfo {
                    spec_in_supergraph,
                    directives: Default::default(),
                });

            if supergraph_info.spec_in_supergraph.url() != spec_in_supergraph.url() {
                bail!(
                    "Spec {} directives disagree on version for supergraph",
                    spec_in_supergraph.url()
                )
            }

            if !supergraph_info
                .directives
                .iter()
                .any(|d| d.name_in_feature == subgraph_core_directive.name)
            {
                supergraph_info.directives.push(SupergraphDirectiveInfo {
                    name_in_feature: subgraph_core_directive.name.clone(),
                    name_in_supergraph: name_in_supergraph.clone(),
                    composition_spec: subgraph_core_directive.composition_spec.clone(),
                });
            }

            if subgraph_core_directive.composition_spec.use_join_directive {
                self.directives_using_join_directive
                    .insert(name_in_supergraph.clone());
            }
        }

        Ok(supergraph_info_by_identity)
    }

    pub(crate) fn maybe_add_specs(
        &mut self,
        supergraph_info_by_identity: &SupergraphInfoByIdentity,
    ) -> Result<(), FederationError> {
        for SupergraphInfo {
            spec_in_supergraph,
            directives,
        } in supergraph_info_by_identity.values()
        {
            let spec_name_in_schema = if self
                .merged
                .metadata()
                .map(|metadata| {
                    metadata.is_spec_name_in_schema_valid(
                        &spec_in_supergraph.identity().name,
                        spec_in_supergraph.identity(),
                        &self.import_conflicts_by_identity,
                    )
                })
                .unwrap_or(false)
            {
                Name::new_unchecked(&spec_in_supergraph.identity().name)
            } else {
                self.compute_unique_spec_name_in_schema
                    .as_mut()
                    .ok_or_else(|| {
                        internal_error!(
                            "compute_unique_spec_name_in_schema() unexpectedly uninitialized"
                        )
                    })?(&spec_in_supergraph.identity().name)
            };
            let mut imports = Vec::new();
            for supergraph_core_directive in directives {
                // Directives composed via @join__directive are not imported in the supergraph schema.
                if supergraph_core_directive
                    .composition_spec
                    .use_join_directive
                {
                    continue;
                }
                let default_name_in_supergraph = Link::directive_name_in_schema_for_core_arguments(
                    spec_in_supergraph.url(),
                    &spec_name_in_schema,
                    &[],
                    &supergraph_core_directive.name_in_feature,
                );
                if supergraph_core_directive.name_in_supergraph != default_name_in_supergraph {
                    let alias = if supergraph_core_directive.name_in_feature
                        == supergraph_core_directive.name_in_supergraph
                    {
                        None
                    } else {
                        Some(supergraph_core_directive.name_in_supergraph.clone())
                    };
                    imports.push(Import {
                        element: supergraph_core_directive.name_in_feature.clone(),
                        is_directive: true,
                        alias,
                    });
                }
            }

            self.link_spec_definition.apply_feature_to_schema(
                &mut self.merged,
                *spec_in_supergraph,
                Some(spec_name_in_schema).take_if(|spec_name_in_schema| {
                    spec_name_in_schema.as_str() != spec_in_supergraph.identity().name.as_ref()
                }),
                spec_in_supergraph.purpose(),
                Some(imports),
                |error| Self::push_non_internal_errors(&mut self.error_reporter, error),
            )?;

            let Some(links_metadata) = self.merged.metadata() else {
                bail!("Missing links metadata in supergraph schema");
            };
            let feature = links_metadata.for_identity(&spec_in_supergraph.url().identity);
            for supergraph_core_directive in directives {
                let arguments_merger = if let Some(merger_factory) = supergraph_core_directive
                    .composition_spec
                    .argument_merger
                    .as_ref()
                {
                    Some(merger_factory(&self.merged, feature.as_ref())?)
                } else {
                    None
                };
                self.merged_federation_directive_names
                    .insert(supergraph_core_directive.name_in_supergraph.to_string());
                self.merged_federation_directive_in_supergraph_by_directive_name
                    .insert(
                        supergraph_core_directive.name_in_supergraph.clone(),
                        MergedDirectiveInfo {
                            arguments_merger,
                            static_argument_transform: supergraph_core_directive
                                .composition_spec
                                .static_argument_transform
                                .clone(),
                        },
                    );
                // If we encounter the @inaccessible directive, we need to record its definition so
                // certain merge validations that care about @inaccessible can act accordingly.
                if *spec_in_supergraph.identity() == Identity::inaccessible_identity()
                    && supergraph_core_directive.name_in_feature == Identity::INACCESSIBLE_NAME
                {
                    self.inaccessible_directive_name_in_supergraph =
                        Some(supergraph_core_directive.name_in_supergraph.clone());
                }

                if *spec_in_supergraph.identity() == Identity::authenticated_identity()
                    && supergraph_core_directive.name_in_feature == Identity::AUTHENTICATED_NAME
                {
                    self.access_control_directives_in_supergraph.push((
                        AUTHENTICATED_DIRECTIVE_NAME_IN_SPEC,
                        supergraph_core_directive.name_in_supergraph.clone(),
                    ));
                }
                if *spec_in_supergraph.identity() == Identity::requires_scopes_identity()
                    && supergraph_core_directive.name_in_feature == Identity::REQUIRES_SCOPES_NAME
                {
                    self.access_control_directives_in_supergraph.push((
                        REQUIRES_SCOPES_DIRECTIVE_NAME_IN_SPEC,
                        supergraph_core_directive.name_in_supergraph.clone(),
                    ));
                }
                if *spec_in_supergraph.identity() == Identity::policy_identity()
                    && supergraph_core_directive.name_in_feature == Identity::POLICY_NAME
                {
                    self.access_control_directives_in_supergraph.push((
                        POLICY_DIRECTIVE_NAME_IN_SPEC,
                        supergraph_core_directive.name_in_supergraph.clone(),
                    ));
                }
            }
        }
        trace!(
            "The following federation directives will be merged if applications are found: {}",
            self.merged_federation_directive_names.iter().join(", ")
        );

        Ok(())
    }
}