use apollo_compiler::{schema::ExtendedType, Schema};
use apollo_federation::composition::{
expand_subgraphs, merge_subgraphs, post_merge_validations, pre_merge_validations,
upgrade_subgraphs_if_necessary, validate_satisfiability, CompositionOptions, Supergraph,
};
use apollo_federation::connectors::{
expand::{expand_connectors, Connectors, ExpansionResult},
validation::{validate, Severity as ValidationSeverity, ValidationResult},
Connector,
};
use apollo_federation::internal_composition_api::validate_cache_tag_directives;
use apollo_federation::subgraph::typestate::{Initial, Subgraph, Validated};
use apollo_federation::subgraph::SubgraphError;
use apollo_federation_types::build_plugin::PluginResult;
use apollo_federation_types::composition::{MergeResult, SubgraphLocation};
use apollo_federation_types::{
composition::{Issue, Severity},
javascript::SubgraphDefinition,
};
use std::collections::HashMap;
use std::iter::once;
use std::sync::Arc;
#[allow(async_fn_in_trait)]
pub trait HybridComposition {
async fn compose_services_without_satisfiability(
&mut self,
subgraph_definitions: Vec<SubgraphDefinition>,
) -> Option<SupergraphSdl<'_>>;
async fn validate_satisfiability(&mut self) -> Result<Vec<Issue>, Vec<Issue>>;
fn update_supergraph_sdl(&mut self, supergraph_sdl: String);
fn add_issues<Source: Iterator<Item = Issue>>(&mut self, issues: Source);
async fn compose(&mut self, subgraph_definitions: Vec<SubgraphDefinition>) {
if let Err(cache_tag_errors) = validate_cache_tag_in_subgraphs(&subgraph_definitions) {
self.add_issues(cache_tag_errors.into_iter());
return;
}
let ConnectorsValidationResult {
subgraphs,
parsed_subgraphs,
hints: connector_hints,
} = match validate_connector_subgraphs(subgraph_definitions) {
Ok(results) => results,
Err(errors) => {
self.add_issues(errors.into_iter());
return;
}
};
self.add_issues(connector_hints.into_iter());
let Some(supergraph_sdl) = self
.compose_services_without_satisfiability(subgraphs)
.await
else {
return;
};
let override_errors = validate_overrides(parsed_subgraphs);
if !override_errors.is_empty() {
self.add_issues(override_errors.into_iter());
return;
}
let expansion_result = match expand_connectors(supergraph_sdl, &Default::default()) {
Ok(result) => result,
Err(err) => {
self.add_issues(once(Issue {
code: "INTERNAL_ERROR".to_string(),
message: format!(
"Composition failed due to an internal error when expanding connectors, please report this: {err}"
),
locations: vec![],
severity: Severity::Error,
}));
return;
}
};
match expansion_result {
ExpansionResult::Expanded {
raw_sdl,
connectors: Connectors {
by_service_name, ..
},
..
} => {
let original_supergraph_sdl = supergraph_sdl.to_string();
self.update_supergraph_sdl(raw_sdl);
let satisfiability_result = self.validate_satisfiability().await;
self.add_issues(
satisfiability_result_into_issues(satisfiability_result).map(|mut issue| {
sanitize_connectors_issue(&mut issue, by_service_name.iter());
issue
}),
);
self.update_supergraph_sdl(original_supergraph_sdl);
}
ExpansionResult::Unchanged => {
let satisfiability_result = self.validate_satisfiability().await;
self.add_issues(satisfiability_result_into_issues(satisfiability_result));
}
}
}
async fn experimental_compose(
mut self,
subgraph_definitions: Vec<SubgraphDefinition>,
) -> Result<PluginResult, Vec<Issue>>
where
Self: Sized,
{
validate_cache_tag_in_subgraphs(&subgraph_definitions)?;
let ConnectorsValidationResult {
subgraphs: connected_subgraphs,
parsed_subgraphs,
hints: connector_hints,
} = validate_connector_subgraphs(subgraph_definitions)?;
let upgraded_subgraphs = self
.experimental_upgrade_subgraphs(connected_subgraphs)
.await?;
let merge_result = self
.experimental_merge_subgraphs(upgraded_subgraphs)
.await?;
let override_errors = validate_overrides(parsed_subgraphs);
if !override_errors.is_empty() {
return Err(override_errors);
}
let supergraph_sdl = merge_result.supergraph.clone();
let expansion_result = match expand_connectors(&supergraph_sdl, &Default::default()) {
Ok(result) => result,
Err(err) => {
return Err(vec![err.into()]);
}
};
match expansion_result {
ExpansionResult::Expanded {
raw_sdl,
connectors: Connectors {
by_service_name, ..
},
..
} => {
self.experimental_validate_satisfiability(raw_sdl.as_str())
.await
.map(|s| {
let mut composition_hints = merge_result.hints;
composition_hints.extend(s);
let mut build_messages: Vec<_> =
connector_hints.into_iter().map(|h| h.into()).collect();
build_messages.extend(composition_hints.into_iter().map(|h| {
let mut issue = Into::<Issue>::into(h);
sanitize_connectors_issue(&mut issue, by_service_name.iter());
issue.into()
}));
PluginResult::new(Ok(supergraph_sdl), build_messages)
})
.map_err(|err| {
err.into_iter()
.map(|mut issue| {
sanitize_connectors_issue(&mut issue, by_service_name.iter());
issue
})
.collect()
})
}
ExpansionResult::Unchanged => self
.experimental_validate_satisfiability(supergraph_sdl.as_str())
.await
.map(|s| {
let mut hints = merge_result.hints;
hints.extend(s);
let build_messages: Vec<_> = hints
.into_iter()
.map(|h| Into::<Issue>::into(h).into())
.collect();
PluginResult::new(Ok(supergraph_sdl), build_messages)
}),
}
}
async fn experimental_upgrade_subgraphs(
&mut self,
subgraphs: Vec<SubgraphDefinition>,
) -> Result<Vec<SubgraphDefinition>, Vec<Issue>> {
let mut issues: Vec<Issue> = vec![];
let initial: Vec<Subgraph<Initial>> = subgraphs
.into_iter()
.map(|s| s.try_into())
.filter_map(|r| {
r.map_err(|e: SubgraphError| issues.extend(convert_subgraph_error_to_issues(e)))
.ok()
})
.collect();
if !issues.is_empty() {
return Err(issues);
}
expand_subgraphs(initial)
.and_then(upgrade_subgraphs_if_necessary)
.map(|subgraphs| subgraphs.into_iter().map(|s| s.into()).collect())
.map_err(|errors| errors.into_iter().map(Issue::from).collect::<Vec<_>>())
}
async fn experimental_merge_subgraphs(
&mut self,
subgraphs: Vec<SubgraphDefinition>,
) -> Result<MergeResult, Vec<Issue>> {
let mut subgraph_errors = vec![];
let validated: Vec<Subgraph<Validated>> = subgraphs
.into_iter()
.map(assume_subgraph_validated)
.filter_map(|r| {
r.map_err(|e| subgraph_errors.extend(convert_subgraph_error_to_issues(e)))
.ok()
})
.collect();
if !subgraph_errors.is_empty() {
return Err(subgraph_errors);
}
pre_merge_validations(&validated)
.map_err(|errors| errors.into_iter().map(Issue::from).collect::<Vec<_>>())?;
let supergraph = merge_subgraphs(validated, &CompositionOptions::default())
.map_err(|errors| errors.into_iter().map(Issue::from).collect::<Vec<_>>())?;
post_merge_validations(&supergraph)
.map_err(|errors| errors.into_iter().map(Issue::from).collect::<Vec<_>>())?;
let hints = supergraph
.hints()
.iter()
.map(|hint| hint.clone().into())
.collect();
Ok(MergeResult {
supergraph: supergraph.schema().schema().to_string(),
hints,
})
}
async fn experimental_validate_satisfiability(
&mut self,
supergraph_sdl: &str,
) -> Result<Vec<Issue>, Vec<Issue>> {
let supergraph = Supergraph::parse(supergraph_sdl).map_err(|e| vec![Issue::from(e)])?;
validate_satisfiability(supergraph, &CompositionOptions::default())
.map(|s| s.hints().iter().map(|h| h.clone().into()).collect())
.map_err(|errors| errors.into_iter().map(Issue::from).collect::<Vec<_>>())
}
}
struct SubgraphSchema {
schema: Schema,
has_connectors: bool,
}
struct ConnectorsValidationResult {
subgraphs: Vec<SubgraphDefinition>,
parsed_subgraphs: HashMap<String, SubgraphSchema>,
hints: Vec<Issue>,
}
fn validate_connector_subgraphs(
subgraph_definitions: Vec<SubgraphDefinition>,
) -> Result<ConnectorsValidationResult, Vec<Issue>> {
let mut subgraph_validation_errors = Vec::new();
let mut subgraph_validation_hints = Vec::new();
let mut parsed_schemas = HashMap::new();
let subgraph_definitions = subgraph_definitions
.into_iter()
.map(|mut subgraph| {
let ValidationResult {
errors,
has_connectors,
schema,
transformed,
} = validate(subgraph.sdl, &subgraph.name);
subgraph.sdl = transformed;
for error in errors {
let issue = Issue {
code: error.code.to_string(),
message: error.message,
locations: error
.locations
.into_iter()
.map(|range| SubgraphLocation {
subgraph: Some(subgraph.name.clone()),
range: Some(range),
})
.collect(),
severity: convert_severity(error.code.severity()),
};
if issue.severity == Severity::Error {
subgraph_validation_errors.push(issue);
} else {
subgraph_validation_hints.push(issue);
}
}
parsed_schemas.insert(
subgraph.name.clone(),
SubgraphSchema {
schema,
has_connectors,
},
);
subgraph
})
.collect();
if !subgraph_validation_errors.is_empty() {
return Err(subgraph_validation_errors);
}
Ok(ConnectorsValidationResult {
subgraphs: subgraph_definitions,
parsed_subgraphs: parsed_schemas,
hints: subgraph_validation_hints,
})
}
fn validate_overrides(schemas: HashMap<String, SubgraphSchema>) -> Vec<Issue> {
let mut override_errors = Vec::new();
for (subgraph_name, SubgraphSchema { schema, .. }) in &schemas {
macro_rules! extract_directives {
($node:ident) => {
$node
.fields
.iter()
.flat_map(|(name, field)| {
field
.directives
.iter()
.map(move |d| (format!("{}.{}", $node.name, name), d))
})
.collect::<Vec<_>>()
};
}
let override_directives = schema
.types
.values()
.flat_map(|v| match v {
ExtendedType::Object(node) => extract_directives!(node),
ExtendedType::Interface(node) => extract_directives!(node),
ExtendedType::InputObject(node) => extract_directives!(node),
ExtendedType::Scalar(_) | ExtendedType::Union(_) | ExtendedType::Enum(_) => {
Vec::new()
}
})
.filter(|(_, directive)| {
directive.name == "override" || directive.name == "federation__override"
});
for (field, directive) in override_directives {
let Ok(Some(overridden_subgraph_name)) = directive
.argument_by_name("from", schema)
.map(|node| node.as_str())
else {
continue;
};
if schemas
.get(overridden_subgraph_name)
.is_some_and(|schema| schema.has_connectors)
{
override_errors.push(Issue {
code: "OVERRIDE_ON_CONNECTOR".to_string(),
message: format!(
r#"Field "{field}" on subgraph "{subgraph_name}" is trying to override connector-enabled subgraph "{overridden_subgraph_name}", which is not yet supported. See https://go.apollo.dev/connectors/limitations#override-is-partially-unsupported"#,
),
locations: vec![SubgraphLocation {
subgraph: Some(String::from(overridden_subgraph_name)),
range: directive.line_column_range(&schema.sources),
}],
severity: Severity::Error,
});
}
}
}
override_errors
}
fn sanitize_connectors_issue<'a>(
issue: &mut Issue,
connector_subgraphs: impl Iterator<Item = (&'a Arc<str>, &'a Connector)>,
) {
for (service_name, connector) in connector_subgraphs {
issue.message = issue
.message
.replace(&**service_name, connector.id.subgraph_name.as_str());
}
}
fn validate_cache_tag_in_subgraphs(
subgraph_definitions: &[SubgraphDefinition],
) -> Result<(), Vec<Issue>> {
let mut issues = Vec::new();
for subgraph_def in subgraph_definitions {
match validate_cache_tag_directives(
&subgraph_def.name,
&subgraph_def.url,
&subgraph_def.sdl,
) {
Err(_err) => {
break;
}
Ok(res) => {
if !res.errors.is_empty() {
issues.extend(res.errors.into_iter().map(|err| {
Issue {
code: err.code().to_string(),
message: err.message().to_string(),
locations: err
.locations()
.iter()
.cloned()
.map(|range| SubgraphLocation {
subgraph: Some(subgraph_def.name.clone()),
range: Some(range),
})
.collect(),
severity: Severity::Error,
}
}));
}
}
}
}
if !issues.is_empty() {
Err(issues)
} else {
Ok(())
}
}
pub type SupergraphSdl<'a> = &'a str;
#[derive(Clone, Debug)]
pub struct PartialSuccess {
pub supergraph_sdl: String,
pub issues: Vec<Issue>,
}
fn convert_severity(severity: ValidationSeverity) -> Severity {
match severity {
ValidationSeverity::Error => Severity::Error,
ValidationSeverity::Warning => Severity::Warning,
}
}
fn satisfiability_result_into_issues(
result: Result<Vec<Issue>, Vec<Issue>>,
) -> impl Iterator<Item = Issue> {
match result {
Ok(hints) => hints.into_iter(),
Err(errors) => errors.into_iter(),
}
}
fn assume_subgraph_validated(
definition: SubgraphDefinition,
) -> Result<Subgraph<Validated>, SubgraphError> {
Subgraph::parse(
definition.name.as_str(),
definition.url.as_str(),
definition.sdl.as_str(),
)
.and_then(|s| s.assume_expanded())
.map(|s| s.assume_validated())
}
fn convert_subgraph_error_to_issues(error: SubgraphError) -> Vec<Issue> {
error
.to_composition_errors()
.map(|err| err.into())
.collect()
}