1use apollo_compiler::{schema::ExtendedType, Schema};
2use apollo_federation::composition::{
3 expand_subgraphs, merge_subgraphs, post_merge_validations, pre_merge_validations,
4 upgrade_subgraphs_if_necessary, validate_satisfiability, validate_subgraphs, Supergraph,
5};
6use apollo_federation::sources::connect::{
7 expand::{expand_connectors, Connectors, ExpansionResult},
8 validation::{validate, Severity as ValidationSeverity, ValidationResult},
9 Connector,
10};
11use apollo_federation::subgraph::typestate::{Initial, Subgraph, Upgraded, Validated};
12use apollo_federation::subgraph::SubgraphError;
13use apollo_federation_types::build_plugin::{BuildMessage, PluginResult};
14use apollo_federation_types::composition::{convert_subraph_error_to_issues, SubgraphLocation};
15use apollo_federation_types::javascript::{CompositionHint, HintCodeDefinition, MergeResult};
16use apollo_federation_types::{
17 composition::{Issue, Severity},
18 javascript::{SatisfiabilityResult, SubgraphDefinition},
19};
20use either::Either;
21use std::collections::HashMap;
22use std::iter::once;
23use std::sync::Arc;
24
25#[allow(async_fn_in_trait)]
31pub trait HybridComposition {
32 async fn compose_services_without_satisfiability(
35 &mut self,
36 subgraph_definitions: Vec<SubgraphDefinition>,
37 ) -> Option<SupergraphSdl>;
38
39 async fn validate_satisfiability(&mut self) -> Result<SatisfiabilityResult, Issue>;
53
54 fn update_supergraph_sdl(&mut self, supergraph_sdl: String);
57
58 fn add_issues<Source: Iterator<Item = Issue>>(&mut self, issues: Source);
63
64 async fn compose(&mut self, subgraph_definitions: Vec<SubgraphDefinition>) {
78 let ConnectorsValidationResult {
80 subgraphs,
81 parsed_subgraphs,
82 hints: connector_hints,
83 } = match validate_connector_subgraphs(subgraph_definitions) {
84 Ok(results) => results,
85 Err(errors) => {
86 self.add_issues(errors.into_iter());
87 return;
88 }
89 };
90 self.add_issues(connector_hints.into_iter());
91
92 let Some(supergraph_sdl) = self
93 .compose_services_without_satisfiability(subgraphs)
94 .await
95 else {
96 return;
97 };
98
99 let override_errors = validate_overrides(parsed_subgraphs);
102 if !override_errors.is_empty() {
103 self.add_issues(override_errors.into_iter());
104 return;
105 }
106
107 let expansion_result = match expand_connectors(supergraph_sdl, &Default::default()) {
108 Ok(result) => result,
109 Err(err) => {
110 self.add_issues(once(Issue {
111 code: "INTERNAL_ERROR".to_string(),
112 message: format!(
113 "Composition failed due to an internal error, please report this: {}",
114 err
115 ),
116 locations: vec![],
117 severity: Severity::Error,
118 }));
119 return;
120 }
121 };
122 match expansion_result {
123 ExpansionResult::Expanded {
124 raw_sdl,
125 connectors: Connectors {
126 by_service_name, ..
127 },
128 ..
129 } => {
130 let original_supergraph_sdl = supergraph_sdl.to_string();
131 self.update_supergraph_sdl(raw_sdl);
132 let satisfiability_result = self.validate_satisfiability().await;
133 self.add_issues(
134 satisfiability_result_into_issues(satisfiability_result).map(|mut issue| {
135 sanitize_connectors_issue(&mut issue, by_service_name.iter());
136 issue
137 }),
138 );
139
140 self.update_supergraph_sdl(original_supergraph_sdl);
141 }
142 ExpansionResult::Unchanged => {
143 let satisfiability_result = self.validate_satisfiability().await;
144 self.add_issues(satisfiability_result_into_issues(satisfiability_result));
145 }
146 }
147 }
148
149 async fn experimental_compose(
168 mut self,
169 subgraph_definitions: Vec<SubgraphDefinition>,
170 ) -> Result<PluginResult, Vec<Issue>>
171 where
172 Self: Sized,
173 {
174 let upgraded_subgraphs = self
175 .experimental_upgrade_subgraphs(subgraph_definitions)
176 .await?;
177 let validated_subgraphs = self
178 .experimental_validate_subgraphs(upgraded_subgraphs)
179 .await?;
180
181 let ConnectorsValidationResult {
185 subgraphs: connected_subgraphs,
186 parsed_subgraphs,
187 hints: connector_hints,
188 } = validate_connector_subgraphs(validated_subgraphs)?;
189 let override_errors = validate_overrides(parsed_subgraphs);
190 if !override_errors.is_empty() {
191 return Err(override_errors);
192 }
193
194 let merge_result = self
196 .experimental_merge_subgraphs(connected_subgraphs)
197 .await?;
198
199 let supergraph_sdl = merge_result.supergraph.clone();
201 let expansion_result = match expand_connectors(&supergraph_sdl, &Default::default()) {
202 Ok(result) => result,
203 Err(err) => {
204 return Err(vec![err.into()]);
205 }
206 };
207
208 match expansion_result {
210 ExpansionResult::Expanded {
211 raw_sdl,
212 connectors: Connectors {
213 by_service_name, ..
214 },
215 ..
216 } => {
217 self.experimental_validate_satisfiability(raw_sdl.as_str())
218 .await
219 .map(|s| {
220 let mut composition_hints = merge_result.hints;
221 composition_hints.extend(s);
222
223 let mut build_messages: Vec<BuildMessage> =
224 connector_hints.into_iter().map(|h| h.into()).collect();
225 build_messages.extend(composition_hints.into_iter().map(|h| {
226 let mut issue = Into::<Issue>::into(h);
227 sanitize_connectors_issue(&mut issue, by_service_name.iter());
228 issue.into()
229 }));
230 PluginResult::new(Ok(supergraph_sdl), build_messages)
232 })
233 .map_err(|err| {
234 err.into_iter()
235 .map(|mut issue| {
236 sanitize_connectors_issue(&mut issue, by_service_name.iter());
237 issue
238 })
239 .collect()
240 })
241 }
242 ExpansionResult::Unchanged => self
243 .experimental_validate_satisfiability(supergraph_sdl.as_str())
244 .await
245 .map(|s| {
246 let mut hints = merge_result.hints;
247 hints.extend(s);
248
249 let build_messages: Vec<BuildMessage> = hints
250 .into_iter()
251 .map(|h| Into::<Issue>::into(h).into())
252 .collect();
253 PluginResult::new(Ok(supergraph_sdl), build_messages)
254 }),
255 }
256 }
257
258 async fn experimental_upgrade_subgraphs(
265 &mut self,
266 subgraphs: Vec<SubgraphDefinition>,
267 ) -> Result<Vec<SubgraphDefinition>, Vec<Issue>> {
268 let mut issues: Vec<Issue> = vec![];
269 let initial: Vec<Subgraph<Initial>> = subgraphs
270 .into_iter()
271 .map(|s| s.try_into())
272 .filter_map(|r| {
273 r.map_err(|e: SubgraphError| issues.extend(convert_subraph_error_to_issues(e)))
274 .ok()
275 })
276 .collect();
277 if !issues.is_empty() {
278 return Err(issues);
279 }
280 expand_subgraphs(initial)
281 .and_then(upgrade_subgraphs_if_necessary)
282 .map(|subgraphs| subgraphs.into_iter().map(|s| s.into()).collect())
283 .map_err(|errors| errors.into_iter().map(Issue::from).collect::<Vec<_>>())
284 }
285
286 async fn experimental_validate_subgraphs(
288 &mut self,
289 subgraphs: Vec<SubgraphDefinition>,
290 ) -> Result<Vec<SubgraphDefinition>, Vec<Issue>> {
291 let mut issues = vec![];
292 let upgraded: Vec<Subgraph<Upgraded>> = subgraphs
293 .into_iter()
294 .map(assume_subgraph_upgraded)
295 .filter_map(|r| {
296 r.map_err(|e| issues.extend(convert_subraph_error_to_issues(e)))
297 .ok()
298 })
299 .collect();
300 if !issues.is_empty() {
301 return Err(issues);
303 }
304 validate_subgraphs(upgraded)
305 .map(|subgraphs| subgraphs.into_iter().map(|s| s.into()).collect())
306 .map_err(|errors| errors.into_iter().map(Issue::from).collect::<Vec<_>>())
307 }
308
309 async fn experimental_merge_subgraphs(
310 &mut self,
311 subgraphs: Vec<SubgraphDefinition>,
312 ) -> Result<MergeResult, Vec<Issue>> {
313 let mut subgraph_errors = vec![];
314 let validated: Vec<Subgraph<Validated>> = subgraphs
315 .into_iter()
316 .map(assume_subgraph_validated)
317 .filter_map(|r| {
318 r.map_err(|e| subgraph_errors.extend(convert_subraph_error_to_issues(e)))
319 .ok()
320 })
321 .collect();
322 if !subgraph_errors.is_empty() {
323 return Err(subgraph_errors);
325 }
326 pre_merge_validations(&validated)
327 .map_err(|errors| errors.into_iter().map(Issue::from).collect::<Vec<_>>())?;
328 let supergraph = merge_subgraphs(validated)
329 .map_err(|errors| errors.into_iter().map(Issue::from).collect::<Vec<_>>())?;
330 post_merge_validations(&supergraph)
331 .map_err(|errors| errors.into_iter().map(Issue::from).collect::<Vec<_>>())?;
332 let hints = supergraph
333 .hints()
334 .iter()
335 .map(|h| CompositionHint {
336 message: h.message.clone(),
337 definition: HintCodeDefinition {
338 code: h.code.clone(),
339 },
340 nodes: None,
341 })
342 .collect();
343 Ok(MergeResult {
344 supergraph: supergraph.schema().to_string(),
345 hints,
346 })
347 }
348
349 async fn experimental_validate_satisfiability(
350 &mut self,
351 supergraph_sdl: &str,
352 ) -> Result<Vec<CompositionHint>, Vec<Issue>> {
353 let supergraph = Supergraph::parse(supergraph_sdl).map_err(|e| vec![Issue::from(e)])?;
354 validate_satisfiability(supergraph)
355 .map(|s| {
356 s.hints()
357 .iter()
358 .map(|h| CompositionHint {
359 message: h.message.clone(),
360 definition: HintCodeDefinition {
361 code: h.code.clone(),
362 },
363 nodes: None,
364 })
365 .collect()
366 })
367 .map_err(|errors| errors.into_iter().map(Issue::from).collect::<Vec<_>>())
368 }
369}
370
371struct SubgraphSchema {
372 schema: Schema,
373 has_connectors: bool,
374}
375
376struct ConnectorsValidationResult {
377 subgraphs: Vec<SubgraphDefinition>,
378 parsed_subgraphs: HashMap<String, SubgraphSchema>,
379 hints: Vec<Issue>,
380}
381fn validate_connector_subgraphs(
383 subgraph_definitions: Vec<SubgraphDefinition>,
384) -> Result<ConnectorsValidationResult, Vec<Issue>> {
385 let mut subgraph_validation_errors = Vec::new();
386 let mut subgraph_validation_hints = Vec::new();
387 let mut parsed_schemas = HashMap::new();
388 let subgraph_definitions = subgraph_definitions
389 .into_iter()
390 .map(|mut subgraph| {
391 let ValidationResult {
392 errors,
393 has_connectors,
394 schema,
395 transformed,
396 } = validate(subgraph.sdl, &subgraph.name);
397 subgraph.sdl = transformed;
398 for error in errors {
399 let issue = Issue {
400 code: error.code.to_string(),
401 message: error.message,
402 locations: error
403 .locations
404 .into_iter()
405 .map(|range| SubgraphLocation {
406 subgraph: Some(subgraph.name.clone()),
407 range: Some(range),
408 })
409 .collect(),
410 severity: convert_severity(error.code.severity()),
411 };
412 if issue.severity == Severity::Error {
413 subgraph_validation_errors.push(issue);
414 } else {
415 subgraph_validation_hints.push(issue);
416 }
417 }
418 parsed_schemas.insert(
419 subgraph.name.clone(),
420 SubgraphSchema {
421 schema,
422 has_connectors,
423 },
424 );
425 subgraph
426 })
427 .collect();
428
429 if !subgraph_validation_errors.is_empty() {
430 return Err(subgraph_validation_errors);
431 }
432 Ok(ConnectorsValidationResult {
433 subgraphs: subgraph_definitions,
434 parsed_subgraphs: parsed_schemas,
435 hints: subgraph_validation_hints,
436 })
437}
438
439fn validate_overrides(schemas: HashMap<String, SubgraphSchema>) -> Vec<Issue> {
446 let mut override_errors = Vec::new();
447 for (subgraph_name, SubgraphSchema { schema, .. }) in &schemas {
448 macro_rules! extract_directives {
451 ($node:ident) => {
452 $node
453 .fields
454 .iter()
455 .flat_map(|(name, field)| {
456 field
457 .directives
458 .iter()
459 .map(move |d| (format!("{}.{}", $node.name, name), d))
460 })
461 .collect::<Vec<_>>()
462 };
463 }
464
465 let override_directives = schema
466 .types
467 .values()
468 .flat_map(|v| match v {
469 ExtendedType::Object(node) => extract_directives!(node),
470 ExtendedType::Interface(node) => extract_directives!(node),
471 ExtendedType::InputObject(node) => extract_directives!(node),
472
473 ExtendedType::Scalar(_) | ExtendedType::Union(_) | ExtendedType::Enum(_) => {
475 Vec::new()
476 }
477 })
478 .filter(|(_, directive)| {
479 directive.name == "override" || directive.name == "federation__override"
482 });
483
484 for (field, directive) in override_directives {
486 let Ok(Some(overridden_subgraph_name)) = directive
489 .argument_by_name("from", schema)
490 .map(|node| node.as_str())
491 else {
492 continue;
493 };
494
495 if schemas
496 .get(overridden_subgraph_name)
497 .is_some_and(|schema| schema.has_connectors)
498 {
499 override_errors.push(Issue {
500 code: "OVERRIDE_ON_CONNECTOR".to_string(),
501 message: format!(
502 r#"Field "{}" on subgraph "{}" is trying to override connector-enabled subgraph "{}", which is not yet supported. See https://go.apollo.dev/connectors/limitations#override-is-partially-unsupported"#,
503 field,
504 subgraph_name,
505 overridden_subgraph_name,
506 ),
507 locations: vec![SubgraphLocation {
508 subgraph: Some(String::from(overridden_subgraph_name)),
509 range: directive.line_column_range(&schema.sources),
510 }],
511 severity: Severity::Error,
512 });
513 }
514 }
515 }
516
517 override_errors
518}
519
520fn sanitize_connectors_issue<'a>(
521 issue: &mut Issue,
522 connector_subgraphs: impl Iterator<Item = (&'a Arc<str>, &'a Connector)>,
523) {
524 for (service_name, connector) in connector_subgraphs {
525 issue.message = issue
526 .message
527 .replace(&**service_name, connector.id.subgraph_name.as_str());
528 }
529}
530
531pub type SupergraphSdl<'a> = &'a str;
532
533#[derive(Clone, Debug)]
535pub struct PartialSuccess {
536 pub supergraph_sdl: String,
537 pub issues: Vec<Issue>,
538}
539
540fn convert_severity(severity: ValidationSeverity) -> Severity {
541 match severity {
542 ValidationSeverity::Error => Severity::Error,
543 ValidationSeverity::Warning => Severity::Warning,
544 }
545}
546
547fn satisfiability_result_into_issues(
548 satisfiability_result: Result<SatisfiabilityResult, Issue>,
549) -> Either<impl Iterator<Item = Issue>, impl Iterator<Item = Issue>> {
550 match satisfiability_result {
551 Ok(satisfiability_result) => Either::Left(
552 satisfiability_result
553 .errors
554 .into_iter()
555 .flatten()
556 .map(Issue::from)
557 .chain(
558 satisfiability_result
559 .hints
560 .into_iter()
561 .flatten()
562 .map(Issue::from),
563 ),
564 ),
565 Err(issue) => Either::Right(once(issue)),
566 }
567}
568
569fn assume_subgraph_upgraded(
571 definition: SubgraphDefinition,
572) -> Result<Subgraph<Upgraded>, SubgraphError> {
573 Subgraph::parse(
574 definition.name.as_str(),
575 definition.url.as_str(),
576 definition.sdl.as_str(),
577 )
578 .and_then(|s| s.assume_expanded())
579 .map(|s| s.assume_upgraded())
580}
581
582fn assume_subgraph_validated(
584 definition: SubgraphDefinition,
585) -> Result<Subgraph<Validated>, SubgraphError> {
586 assume_subgraph_upgraded(definition).and_then(|s| s.assume_validated())
587}