use std::collections::HashMap;
use std::sync::Arc;
use apollo_compiler::Name;
use apollo_compiler::ast::DirectiveDefinition;
use apollo_compiler::collections::IndexMap;
use itertools::Itertools;
use tracing::trace;
use crate::bail;
use crate::error::CompositionError;
use crate::error::FederationError;
use crate::link::Import;
use crate::link::Link;
use crate::link::authenticated_spec_definition::AUTHENTICATED_DIRECTIVE_NAME_IN_SPEC;
use crate::link::policy_spec_definition::POLICY_DIRECTIVE_NAME_IN_SPEC;
use crate::link::requires_scopes_spec_definition::REQUIRES_SCOPES_DIRECTIVE_NAME_IN_SPEC;
use crate::link::spec::Identity;
use crate::link::spec::Url;
use crate::link::spec_definition::SPEC_REGISTRY;
use crate::link::spec_definition::SpecDefinition;
use crate::merger::merge::MergedDirectiveInfo;
use crate::merger::merge::Merger;
use crate::schema::type_and_directive_specification::DirectiveCompositionSpecification;
pub(crate) struct CoreDirectiveInSubgraphs {
url: Url,
name: Name,
definitions_per_subgraph: IndexMap<String, DirectiveDefinition>,
composition_spec: DirectiveCompositionSpecification,
}
impl std::fmt::Debug for CoreDirectiveInSubgraphs {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CoreDirectiveInSubgraphs")
.field("url", &self.url)
.field("name", &self.name)
.field("definitions_per_subgraph", &self.definitions_per_subgraph)
.finish()
}
}
struct CoreDirectiveInSupergraph {
spec_in_supergraph: &'static dyn SpecDefinition,
name_in_feature: Name,
name_in_supergraph: Name,
composition_spec: DirectiveCompositionSpecification,
}
impl Merger {
pub(crate) fn collect_core_directives_to_compose(
&self,
) -> Result<Vec<CoreDirectiveInSubgraphs>, FederationError> {
trace!("Collecting core directives used in subgraphs");
let mut directives_per_feature_and_version: HashMap<
String,
HashMap<i32, CoreDirectiveInSubgraphs>,
> = HashMap::new();
for subgraph in &self.subgraphs {
let Some(features) = subgraph.schema().metadata() else {
bail!("Subgraphs should be core schemas")
};
for (directive, referencers) in &subgraph.schema().referencers().directives {
let Some(linked_elem) = features.source_link_of_directive(directive) else {
continue;
};
if referencers.is_empty() {
continue;
}
let source = linked_elem.link;
let import = match linked_elem.import {
Some(import) => import,
None => {
let Some((_, directive_name_in_spec)) = directive.split_once("__") else {
continue;
};
let Ok(element_name) = Name::new(directive_name_in_spec) else {
continue;
};
Arc::new(Import {
element: element_name,
is_directive: true,
alias: None,
})
}
};
let Some(composition_spec) = SPEC_REGISTRY.get_composition_spec(&source, &import)
else {
trace!(
"Directive @{directive} from {} has no registered composition spec, skipping",
source.url
);
continue;
};
let Some(definition) = subgraph
.schema()
.schema()
.directive_definitions
.get(directive)
else {
bail!(
"Missing directive definition for @{directive} in {}",
source
)
};
let fqn = format!("{}-{}", import.element, source.url.identity);
let for_feature = directives_per_feature_and_version.entry(fqn).or_default();
let major = if source.url.version.major > 0 {
source.url.version.major as i32
} else {
-(source.url.version.minor as i32)
};
if let Some(for_version) = for_feature.get_mut(&major) {
if source.url.version > for_version.url.version {
for_version.url = source.url.clone();
}
for_version
.definitions_per_subgraph
.insert(subgraph.name.clone(), (**definition).clone());
} else {
let mut definitions_per_subgraph = IndexMap::default();
definitions_per_subgraph.insert(subgraph.name.clone(), (**definition).clone());
let for_version = CoreDirectiveInSubgraphs {
url: source.url.clone(),
name: import.element.clone(),
definitions_per_subgraph,
composition_spec,
};
for_feature.insert(major, for_version);
}
}
}
Ok(directives_per_feature_and_version
.into_iter()
.flat_map(|(_, values)| values.into_values())
.collect())
}
pub(crate) fn validate_and_maybe_add_specs(
&mut self,
directives_merge_info: &[CoreDirectiveInSubgraphs],
) -> Result<(), FederationError> {
let mut supergraph_info_by_identity: HashMap<Identity, Vec<CoreDirectiveInSupergraph>> =
HashMap::new();
trace!("Determining supergraph names for directives used in subgraphs");
for subgraph_core_directive in directives_merge_info {
let mut name_in_supergraph: Option<&Name> = None;
for subgraph in &self.subgraphs {
let Some(directive) = subgraph_core_directive
.definitions_per_subgraph
.get(&subgraph.name)
else {
continue;
};
if name_in_supergraph.is_none() {
name_in_supergraph = Some(&directive.name);
} else if name_in_supergraph.is_some_and(|n| *n != directive.name) {
let definition_sources = self
.subgraphs
.iter()
.enumerate()
.map(|(idx, s)| {
(
idx,
subgraph_core_directive
.definitions_per_subgraph
.get(&s.name),
)
})
.collect();
self.error_reporter.report_mismatch_error(
CompositionError::LinkImportNameMismatch {
message: format!("The \"@{}\" directive (from {}) is imported with mismatched name between subgraphs: it is imported as ", directive.name, subgraph_core_directive.url),
},
&directive,
&definition_sources,
&self.subgraphs,
|def| Some(format!("\"@{}\"", def.name)),
|def, _| Some(format!("\"@{}\"", def.name)),
);
return Ok(());
}
}
let Some(name_in_supergraph) = name_in_supergraph else {
trace!(
"Directive @{} is not used in any subgraph, skipping",
subgraph_core_directive.name
);
continue;
};
let Some(spec_in_supergraph) =
(subgraph_core_directive
.composition_spec
.supergraph_specification)(&self.latest_federation_version_used)
else {
trace!(
"Directive @{name_in_supergraph} has no registered composition spec, skipping"
);
continue;
};
let supergraph_info = supergraph_info_by_identity
.entry(spec_in_supergraph.identity().clone())
.or_default();
if !supergraph_info
.iter()
.any(|d| d.name_in_feature == subgraph_core_directive.name)
{
supergraph_info.push(CoreDirectiveInSupergraph {
spec_in_supergraph,
name_in_feature: subgraph_core_directive.name.clone(),
name_in_supergraph: name_in_supergraph.clone(),
composition_spec: subgraph_core_directive.composition_spec.clone(),
});
}
if supergraph_info
.iter()
.any(|s| s.spec_in_supergraph.url() != spec_in_supergraph.url())
{
bail!(
"Spec {} directives disagree on version for supergraph",
spec_in_supergraph.url()
)
}
if subgraph_core_directive.composition_spec.use_join_directive {
self.directives_using_join_directive
.insert(name_in_supergraph.clone());
}
}
for supergraph_core_directives in supergraph_info_by_identity.values() {
let mut imports = Vec::new();
for supergraph_core_directive in supergraph_core_directives {
if supergraph_core_directive
.composition_spec
.use_join_directive
{
continue;
}
let default_name_in_supergraph = Link::directive_name_in_schema_for_core_arguments(
supergraph_core_directive.spec_in_supergraph.url(),
&supergraph_core_directive
.spec_in_supergraph
.url()
.identity
.name,
&[],
&supergraph_core_directive.name_in_feature,
);
if supergraph_core_directive.name_in_supergraph != default_name_in_supergraph {
let alias = if supergraph_core_directive.name_in_feature
== supergraph_core_directive.name_in_supergraph
{
None
} else {
Some(supergraph_core_directive.name_in_supergraph.clone())
};
imports.push(Import {
element: supergraph_core_directive.name_in_feature.clone(),
is_directive: true,
alias,
});
}
}
self.link_spec_definition.apply_feature_to_schema(
&mut self.merged,
supergraph_core_directives[0].spec_in_supergraph,
None,
supergraph_core_directives[0].spec_in_supergraph.purpose(),
Some(imports),
)?;
let Some(links_metadata) = self.merged.metadata() else {
bail!("Missing links metadata in supergraph schema");
};
let feature = links_metadata.for_identity(
&supergraph_core_directives[0]
.spec_in_supergraph
.url()
.identity,
);
for supergraph_core_directive in supergraph_core_directives {
let arguments_merger = if let Some(merger_factory) = supergraph_core_directive
.composition_spec
.argument_merger
.as_ref()
{
Some(merger_factory(&self.merged, feature.as_ref())?)
} else {
None
};
self.merged_federation_directive_names
.insert(supergraph_core_directive.name_in_supergraph.to_string());
self.merged_federation_directive_in_supergraph_by_directive_name
.insert(
supergraph_core_directive.name_in_supergraph.clone(),
MergedDirectiveInfo {
arguments_merger,
static_argument_transform: supergraph_core_directive
.composition_spec
.static_argument_transform
.clone(),
},
);
if *supergraph_core_directive.spec_in_supergraph.identity()
== Identity::inaccessible_identity()
&& supergraph_core_directive.name_in_feature
== supergraph_core_directive
.spec_in_supergraph
.url()
.identity
.name
{
self.inaccessible_directive_name_in_supergraph =
Some(supergraph_core_directive.name_in_supergraph.clone());
}
if *supergraph_core_directive.spec_in_supergraph.identity()
== Identity::authenticated_identity()
&& supergraph_core_directive.name_in_feature
== supergraph_core_directive
.spec_in_supergraph
.url()
.identity
.name
{
self.access_control_directives_in_supergraph.push((
AUTHENTICATED_DIRECTIVE_NAME_IN_SPEC,
supergraph_core_directive.name_in_supergraph.clone(),
));
}
if *supergraph_core_directive.spec_in_supergraph.identity()
== Identity::requires_scopes_identity()
&& supergraph_core_directive.name_in_feature
== supergraph_core_directive
.spec_in_supergraph
.url()
.identity
.name
{
self.access_control_directives_in_supergraph.push((
REQUIRES_SCOPES_DIRECTIVE_NAME_IN_SPEC,
supergraph_core_directive.name_in_supergraph.clone(),
));
}
if *supergraph_core_directive.spec_in_supergraph.identity()
== Identity::policy_identity()
&& supergraph_core_directive.name_in_feature
== supergraph_core_directive
.spec_in_supergraph
.url()
.identity
.name
{
self.access_control_directives_in_supergraph.push((
POLICY_DIRECTIVE_NAME_IN_SPEC,
supergraph_core_directive.name_in_supergraph.clone(),
));
}
}
}
trace!(
"The following federation directives will be merged if applications are found: {}",
self.merged_federation_directive_names.iter().join(", ")
);
Ok(())
}
}