1use apollo_compiler::{schema::ExtendedType, Schema};
2use apollo_federation::composition::{
3 expand_subgraphs, merge_subgraphs, post_merge_validations, pre_merge_validations,
4 upgrade_subgraphs_if_necessary, validate_satisfiability, Supergraph,
5};
6use apollo_federation::connectors::{
7 expand::{expand_connectors, Connectors, ExpansionResult},
8 validation::{validate, Severity as ValidationSeverity, ValidationResult},
9 Connector,
10};
11use apollo_federation::internal_composition_api::validate_cache_tag_directives;
12use apollo_federation::subgraph::typestate::{Initial, Subgraph, Validated};
13use apollo_federation::subgraph::SubgraphError;
14use apollo_federation_types::build_plugin::PluginResult;
15use apollo_federation_types::composition::{MergeResult, SubgraphLocation};
16use apollo_federation_types::{
17 composition::{Issue, Severity},
18 javascript::SubgraphDefinition,
19};
20use std::collections::HashMap;
21use std::iter::once;
22use std::sync::Arc;
23
24#[allow(async_fn_in_trait)]
30pub trait HybridComposition {
31 async fn compose_services_without_satisfiability(
34 &mut self,
35 subgraph_definitions: Vec<SubgraphDefinition>,
36 ) -> Option<SupergraphSdl<'_>>;
37
38 async fn validate_satisfiability(&mut self) -> Result<Vec<Issue>, Vec<Issue>>;
52
53 fn update_supergraph_sdl(&mut self, supergraph_sdl: String);
56
57 fn add_issues<Source: Iterator<Item = Issue>>(&mut self, issues: Source);
62
63 async fn compose(&mut self, subgraph_definitions: Vec<SubgraphDefinition>) {
77 if let Err(cache_tag_errors) = validate_cache_tag_in_subgraphs(&subgraph_definitions) {
79 self.add_issues(cache_tag_errors.into_iter());
80 return;
81 }
82
83 let ConnectorsValidationResult {
85 subgraphs,
86 parsed_subgraphs,
87 hints: connector_hints,
88 } = match validate_connector_subgraphs(subgraph_definitions) {
89 Ok(results) => results,
90 Err(errors) => {
91 self.add_issues(errors.into_iter());
92 return;
93 }
94 };
95 self.add_issues(connector_hints.into_iter());
96
97 let Some(supergraph_sdl) = self
98 .compose_services_without_satisfiability(subgraphs)
99 .await
100 else {
101 return;
102 };
103
104 let override_errors = validate_overrides(parsed_subgraphs);
107 if !override_errors.is_empty() {
108 self.add_issues(override_errors.into_iter());
109 return;
110 }
111
112 let expansion_result = match expand_connectors(supergraph_sdl, &Default::default()) {
113 Ok(result) => result,
114 Err(err) => {
115 self.add_issues(once(Issue {
116 code: "INTERNAL_ERROR".to_string(),
117 message: format!(
118 "Composition failed due to an internal error when expanding connectors, please report this: {err}"
119 ),
120 locations: vec![],
121 severity: Severity::Error,
122 }));
123 return;
124 }
125 };
126 match expansion_result {
127 ExpansionResult::Expanded {
128 raw_sdl,
129 connectors: Connectors {
130 by_service_name, ..
131 },
132 ..
133 } => {
134 let original_supergraph_sdl = supergraph_sdl.to_string();
135 self.update_supergraph_sdl(raw_sdl);
136 let satisfiability_result = self.validate_satisfiability().await;
137 self.add_issues(
138 satisfiability_result_into_issues(satisfiability_result).map(|mut issue| {
139 sanitize_connectors_issue(&mut issue, by_service_name.iter());
140 issue
141 }),
142 );
143
144 self.update_supergraph_sdl(original_supergraph_sdl);
145 }
146 ExpansionResult::Unchanged => {
147 let satisfiability_result = self.validate_satisfiability().await;
148 self.add_issues(satisfiability_result_into_issues(satisfiability_result));
149 }
150 }
151 }
152
153 async fn experimental_compose(
172 mut self,
173 subgraph_definitions: Vec<SubgraphDefinition>,
174 ) -> Result<PluginResult, Vec<Issue>>
175 where
176 Self: Sized,
177 {
178 validate_cache_tag_in_subgraphs(&subgraph_definitions)?;
180
181 let ConnectorsValidationResult {
186 subgraphs: connected_subgraphs,
187 parsed_subgraphs,
188 hints: connector_hints,
189 } = validate_connector_subgraphs(subgraph_definitions)?;
190
191 let upgraded_subgraphs = self
192 .experimental_upgrade_subgraphs(connected_subgraphs)
193 .await?;
194
195 let merge_result = self
197 .experimental_merge_subgraphs(upgraded_subgraphs)
198 .await?;
199
200 let override_errors = validate_overrides(parsed_subgraphs);
204 if !override_errors.is_empty() {
205 return Err(override_errors);
206 }
207
208 let supergraph_sdl = merge_result.supergraph.clone();
210 let expansion_result = match expand_connectors(&supergraph_sdl, &Default::default()) {
211 Ok(result) => result,
212 Err(err) => {
213 return Err(vec![err.into()]);
214 }
215 };
216
217 match expansion_result {
219 ExpansionResult::Expanded {
220 raw_sdl,
221 connectors: Connectors {
222 by_service_name, ..
223 },
224 ..
225 } => {
226 self.experimental_validate_satisfiability(raw_sdl.as_str())
227 .await
228 .map(|s| {
229 let mut composition_hints = merge_result.hints;
230 composition_hints.extend(s);
231
232 let mut build_messages: Vec<_> =
233 connector_hints.into_iter().map(|h| h.into()).collect();
234 build_messages.extend(composition_hints.into_iter().map(|h| {
235 let mut issue = Into::<Issue>::into(h);
236 sanitize_connectors_issue(&mut issue, by_service_name.iter());
237 issue.into()
238 }));
239 PluginResult::new(Ok(supergraph_sdl), build_messages)
241 })
242 .map_err(|err| {
243 err.into_iter()
244 .map(|mut issue| {
245 sanitize_connectors_issue(&mut issue, by_service_name.iter());
246 issue
247 })
248 .collect()
249 })
250 }
251 ExpansionResult::Unchanged => self
252 .experimental_validate_satisfiability(supergraph_sdl.as_str())
253 .await
254 .map(|s| {
255 let mut hints = merge_result.hints;
256 hints.extend(s);
257
258 let build_messages: Vec<_> = hints
259 .into_iter()
260 .map(|h| Into::<Issue>::into(h).into())
261 .collect();
262 PluginResult::new(Ok(supergraph_sdl), build_messages)
263 }),
264 }
265 }
266
267 async fn experimental_upgrade_subgraphs(
275 &mut self,
276 subgraphs: Vec<SubgraphDefinition>,
277 ) -> Result<Vec<SubgraphDefinition>, Vec<Issue>> {
278 let mut issues: Vec<Issue> = vec![];
279 let initial: Vec<Subgraph<Initial>> = subgraphs
280 .into_iter()
281 .map(|s| s.try_into())
282 .filter_map(|r| {
283 r.map_err(|e: SubgraphError| issues.extend(convert_subgraph_error_to_issues(e)))
284 .ok()
285 })
286 .collect();
287 if !issues.is_empty() {
288 return Err(issues);
289 }
290 expand_subgraphs(initial)
291 .and_then(upgrade_subgraphs_if_necessary)
292 .map(|subgraphs| subgraphs.into_iter().map(|s| s.into()).collect())
293 .map_err(|errors| errors.into_iter().map(Issue::from).collect::<Vec<_>>())
294 }
295
296 async fn experimental_merge_subgraphs(
298 &mut self,
299 subgraphs: Vec<SubgraphDefinition>,
300 ) -> Result<MergeResult, Vec<Issue>> {
301 let mut subgraph_errors = vec![];
302 let validated: Vec<Subgraph<Validated>> = subgraphs
303 .into_iter()
304 .map(assume_subgraph_validated)
305 .filter_map(|r| {
306 r.map_err(|e| subgraph_errors.extend(convert_subgraph_error_to_issues(e)))
307 .ok()
308 })
309 .collect();
310 if !subgraph_errors.is_empty() {
311 return Err(subgraph_errors);
313 }
314 pre_merge_validations(&validated)
315 .map_err(|errors| errors.into_iter().map(Issue::from).collect::<Vec<_>>())?;
316 let supergraph = merge_subgraphs(validated)
317 .map_err(|errors| errors.into_iter().map(Issue::from).collect::<Vec<_>>())?;
318 post_merge_validations(&supergraph)
319 .map_err(|errors| errors.into_iter().map(Issue::from).collect::<Vec<_>>())?;
320 let hints = supergraph
321 .hints()
322 .iter()
323 .map(|hint| hint.clone().into())
324 .collect();
325 Ok(MergeResult {
326 supergraph: supergraph.schema().to_string(),
327 hints,
328 })
329 }
330
331 async fn experimental_validate_satisfiability(
333 &mut self,
334 supergraph_sdl: &str,
335 ) -> Result<Vec<Issue>, Vec<Issue>> {
336 let supergraph = Supergraph::parse(supergraph_sdl).map_err(|e| vec![Issue::from(e)])?;
337 validate_satisfiability(supergraph)
338 .map(|s| s.hints().iter().map(|h| h.clone().into()).collect())
339 .map_err(|errors| errors.into_iter().map(Issue::from).collect::<Vec<_>>())
340 }
341}
342
343struct SubgraphSchema {
344 schema: Schema,
345 has_connectors: bool,
346}
347
348struct ConnectorsValidationResult {
349 subgraphs: Vec<SubgraphDefinition>,
350 parsed_subgraphs: HashMap<String, SubgraphSchema>,
351 hints: Vec<Issue>,
352}
353fn validate_connector_subgraphs(
355 subgraph_definitions: Vec<SubgraphDefinition>,
356) -> Result<ConnectorsValidationResult, Vec<Issue>> {
357 let mut subgraph_validation_errors = Vec::new();
358 let mut subgraph_validation_hints = Vec::new();
359 let mut parsed_schemas = HashMap::new();
360 let subgraph_definitions = subgraph_definitions
361 .into_iter()
362 .map(|mut subgraph| {
363 let ValidationResult {
364 errors,
365 has_connectors,
366 schema,
367 transformed,
368 } = validate(subgraph.sdl, &subgraph.name);
369 subgraph.sdl = transformed;
370 for error in errors {
371 let issue = Issue {
372 code: error.code.to_string(),
373 message: error.message,
374 locations: error
375 .locations
376 .into_iter()
377 .map(|range| SubgraphLocation {
378 subgraph: Some(subgraph.name.clone()),
379 range: Some(range),
380 })
381 .collect(),
382 severity: convert_severity(error.code.severity()),
383 };
384 if issue.severity == Severity::Error {
385 subgraph_validation_errors.push(issue);
386 } else {
387 subgraph_validation_hints.push(issue);
388 }
389 }
390 parsed_schemas.insert(
391 subgraph.name.clone(),
392 SubgraphSchema {
393 schema,
394 has_connectors,
395 },
396 );
397 subgraph
398 })
399 .collect();
400
401 if !subgraph_validation_errors.is_empty() {
402 return Err(subgraph_validation_errors);
403 }
404 Ok(ConnectorsValidationResult {
405 subgraphs: subgraph_definitions,
406 parsed_subgraphs: parsed_schemas,
407 hints: subgraph_validation_hints,
408 })
409}
410
411fn validate_overrides(schemas: HashMap<String, SubgraphSchema>) -> Vec<Issue> {
418 let mut override_errors = Vec::new();
419 for (subgraph_name, SubgraphSchema { schema, .. }) in &schemas {
420 macro_rules! extract_directives {
423 ($node:ident) => {
424 $node
425 .fields
426 .iter()
427 .flat_map(|(name, field)| {
428 field
429 .directives
430 .iter()
431 .map(move |d| (format!("{}.{}", $node.name, name), d))
432 })
433 .collect::<Vec<_>>()
434 };
435 }
436
437 let override_directives = schema
438 .types
439 .values()
440 .flat_map(|v| match v {
441 ExtendedType::Object(node) => extract_directives!(node),
442 ExtendedType::Interface(node) => extract_directives!(node),
443 ExtendedType::InputObject(node) => extract_directives!(node),
444
445 ExtendedType::Scalar(_) | ExtendedType::Union(_) | ExtendedType::Enum(_) => {
447 Vec::new()
448 }
449 })
450 .filter(|(_, directive)| {
451 directive.name == "override" || directive.name == "federation__override"
454 });
455
456 for (field, directive) in override_directives {
458 let Ok(Some(overridden_subgraph_name)) = directive
461 .argument_by_name("from", schema)
462 .map(|node| node.as_str())
463 else {
464 continue;
465 };
466
467 if schemas
468 .get(overridden_subgraph_name)
469 .is_some_and(|schema| schema.has_connectors)
470 {
471 override_errors.push(Issue {
472 code: "OVERRIDE_ON_CONNECTOR".to_string(),
473 message: format!(
474 r#"Field "{field}" on subgraph "{subgraph_name}" is trying to override connector-enabled subgraph "{overridden_subgraph_name}", which is not yet supported. See https://go.apollo.dev/connectors/limitations#override-is-partially-unsupported"#,
475 ),
476 locations: vec![SubgraphLocation {
477 subgraph: Some(String::from(overridden_subgraph_name)),
478 range: directive.line_column_range(&schema.sources),
479 }],
480 severity: Severity::Error,
481 });
482 }
483 }
484 }
485
486 override_errors
487}
488
489fn sanitize_connectors_issue<'a>(
490 issue: &mut Issue,
491 connector_subgraphs: impl Iterator<Item = (&'a Arc<str>, &'a Connector)>,
492) {
493 for (service_name, connector) in connector_subgraphs {
494 issue.message = issue
495 .message
496 .replace(&**service_name, connector.id.subgraph_name.as_str());
497 }
498}
499
500fn validate_cache_tag_in_subgraphs(
501 subgraph_definitions: &[SubgraphDefinition],
502) -> Result<(), Vec<Issue>> {
503 let mut issues = Vec::new();
504 for subgraph_def in subgraph_definitions {
505 match validate_cache_tag_directives(
506 &subgraph_def.name,
507 &subgraph_def.url,
508 &subgraph_def.sdl,
509 ) {
510 Err(_err) => {
511 break;
514 }
515 Ok(res) => {
516 if !res.errors.is_empty() {
517 issues.extend(res.errors.into_iter().map(|err| {
518 Issue {
519 code: err.code(),
520 message: err.message(),
521 locations: err
522 .locations
523 .into_iter()
524 .map(|range| SubgraphLocation {
525 subgraph: Some(subgraph_def.name.clone()),
526 range: Some(range),
527 })
528 .collect(),
529 severity: Severity::Error,
530 }
531 }));
532 }
533 }
534 }
535 }
536 if !issues.is_empty() {
537 Err(issues)
538 } else {
539 Ok(())
540 }
541}
542
543pub type SupergraphSdl<'a> = &'a str;
544
545#[derive(Clone, Debug)]
547pub struct PartialSuccess {
548 pub supergraph_sdl: String,
549 pub issues: Vec<Issue>,
550}
551
552fn convert_severity(severity: ValidationSeverity) -> Severity {
553 match severity {
554 ValidationSeverity::Error => Severity::Error,
555 ValidationSeverity::Warning => Severity::Warning,
556 }
557}
558
559fn satisfiability_result_into_issues(
560 result: Result<Vec<Issue>, Vec<Issue>>,
561) -> impl Iterator<Item = Issue> {
562 match result {
563 Ok(hints) => hints.into_iter(),
564 Err(errors) => errors.into_iter(),
565 }
566}
567
568fn assume_subgraph_validated(
571 definition: SubgraphDefinition,
572) -> Result<Subgraph<Validated>, SubgraphError> {
573 Subgraph::parse(
574 definition.name.as_str(),
575 definition.url.as_str(),
576 definition.sdl.as_str(),
577 )
578 .and_then(|s| s.assume_expanded())
579 .map(|s| s.assume_validated())
580}
581
582fn convert_subgraph_error_to_issues(error: SubgraphError) -> Vec<Issue> {
583 error
584 .to_composition_errors()
585 .map(|err| err.into())
586 .collect()
587}