1use apollo_compiler::{schema::ExtendedType, Schema};
2use apollo_federation::composition::{
3 expand_subgraphs, merge_subgraphs, post_merge_validations, pre_merge_validations,
4 upgrade_subgraphs_if_necessary, validate_satisfiability, validate_subgraphs, Supergraph,
5};
6use apollo_federation::connectors::{
7 expand::{expand_connectors, Connectors, ExpansionResult},
8 validation::{validate, Severity as ValidationSeverity, ValidationResult},
9 Connector,
10};
11use apollo_federation::subgraph::typestate::{Initial, Subgraph, Upgraded, Validated};
12use apollo_federation::subgraph::SubgraphError;
13use apollo_federation_types::build_plugin::{BuildMessage, PluginResult};
14use apollo_federation_types::composition::{convert_subraph_error_to_issues, SubgraphLocation};
15use apollo_federation_types::javascript::{CompositionHint, HintCodeDefinition, MergeResult};
16use apollo_federation_types::{
17 composition::{Issue, Severity},
18 javascript::{SatisfiabilityResult, SubgraphDefinition},
19};
20use either::Either;
21use std::collections::HashMap;
22use std::iter::once;
23use std::sync::Arc;
24
25#[allow(async_fn_in_trait)]
31pub trait HybridComposition {
32 async fn compose_services_without_satisfiability(
35 &mut self,
36 subgraph_definitions: Vec<SubgraphDefinition>,
37 ) -> Option<SupergraphSdl>;
38
39 async fn validate_satisfiability(&mut self) -> Result<SatisfiabilityResult, Issue>;
53
54 fn update_supergraph_sdl(&mut self, supergraph_sdl: String);
57
58 fn add_issues<Source: Iterator<Item = Issue>>(&mut self, issues: Source);
63
64 async fn compose(&mut self, subgraph_definitions: Vec<SubgraphDefinition>) {
78 let ConnectorsValidationResult {
80 subgraphs,
81 parsed_subgraphs,
82 hints: connector_hints,
83 } = match validate_connector_subgraphs(subgraph_definitions) {
84 Ok(results) => results,
85 Err(errors) => {
86 self.add_issues(errors.into_iter());
87 return;
88 }
89 };
90 self.add_issues(connector_hints.into_iter());
91
92 let Some(supergraph_sdl) = self
93 .compose_services_without_satisfiability(subgraphs)
94 .await
95 else {
96 return;
97 };
98
99 let override_errors = validate_overrides(parsed_subgraphs);
102 if !override_errors.is_empty() {
103 self.add_issues(override_errors.into_iter());
104 return;
105 }
106
107 let expansion_result = match expand_connectors(supergraph_sdl, &Default::default()) {
108 Ok(result) => result,
109 Err(err) => {
110 self.add_issues(once(Issue {
111 code: "INTERNAL_ERROR".to_string(),
112 message: format!(
113 "Composition failed due to an internal error, please report this: {err}"
114 ),
115 locations: vec![],
116 severity: Severity::Error,
117 }));
118 return;
119 }
120 };
121 match expansion_result {
122 ExpansionResult::Expanded {
123 raw_sdl,
124 connectors: Connectors {
125 by_service_name, ..
126 },
127 ..
128 } => {
129 let original_supergraph_sdl = supergraph_sdl.to_string();
130 self.update_supergraph_sdl(raw_sdl);
131 let satisfiability_result = self.validate_satisfiability().await;
132 self.add_issues(
133 satisfiability_result_into_issues(satisfiability_result).map(|mut issue| {
134 sanitize_connectors_issue(&mut issue, by_service_name.iter());
135 issue
136 }),
137 );
138
139 self.update_supergraph_sdl(original_supergraph_sdl);
140 }
141 ExpansionResult::Unchanged => {
142 let satisfiability_result = self.validate_satisfiability().await;
143 self.add_issues(satisfiability_result_into_issues(satisfiability_result));
144 }
145 }
146 }
147
148 async fn experimental_compose(
167 mut self,
168 subgraph_definitions: Vec<SubgraphDefinition>,
169 ) -> Result<PluginResult, Vec<Issue>>
170 where
171 Self: Sized,
172 {
173 let upgraded_subgraphs = self
174 .experimental_upgrade_subgraphs(subgraph_definitions)
175 .await?;
176 let validated_subgraphs = self
177 .experimental_validate_subgraphs(upgraded_subgraphs)
178 .await?;
179
180 let ConnectorsValidationResult {
184 subgraphs: connected_subgraphs,
185 parsed_subgraphs,
186 hints: connector_hints,
187 } = validate_connector_subgraphs(validated_subgraphs)?;
188 let override_errors = validate_overrides(parsed_subgraphs);
189 if !override_errors.is_empty() {
190 return Err(override_errors);
191 }
192
193 let merge_result = self
195 .experimental_merge_subgraphs(connected_subgraphs)
196 .await?;
197
198 let supergraph_sdl = merge_result.supergraph.clone();
200 let expansion_result = match expand_connectors(&supergraph_sdl, &Default::default()) {
201 Ok(result) => result,
202 Err(err) => {
203 return Err(vec![err.into()]);
204 }
205 };
206
207 match expansion_result {
209 ExpansionResult::Expanded {
210 raw_sdl,
211 connectors: Connectors {
212 by_service_name, ..
213 },
214 ..
215 } => {
216 self.experimental_validate_satisfiability(raw_sdl.as_str())
217 .await
218 .map(|s| {
219 let mut composition_hints = merge_result.hints;
220 composition_hints.extend(s);
221
222 let mut build_messages: Vec<BuildMessage> =
223 connector_hints.into_iter().map(|h| h.into()).collect();
224 build_messages.extend(composition_hints.into_iter().map(|h| {
225 let mut issue = Into::<Issue>::into(h);
226 sanitize_connectors_issue(&mut issue, by_service_name.iter());
227 issue.into()
228 }));
229 PluginResult::new(Ok(supergraph_sdl), build_messages)
231 })
232 .map_err(|err| {
233 err.into_iter()
234 .map(|mut issue| {
235 sanitize_connectors_issue(&mut issue, by_service_name.iter());
236 issue
237 })
238 .collect()
239 })
240 }
241 ExpansionResult::Unchanged => self
242 .experimental_validate_satisfiability(supergraph_sdl.as_str())
243 .await
244 .map(|s| {
245 let mut hints = merge_result.hints;
246 hints.extend(s);
247
248 let build_messages: Vec<BuildMessage> = hints
249 .into_iter()
250 .map(|h| Into::<Issue>::into(h).into())
251 .collect();
252 PluginResult::new(Ok(supergraph_sdl), build_messages)
253 }),
254 }
255 }
256
257 async fn experimental_upgrade_subgraphs(
264 &mut self,
265 subgraphs: Vec<SubgraphDefinition>,
266 ) -> Result<Vec<SubgraphDefinition>, Vec<Issue>> {
267 let mut issues: Vec<Issue> = vec![];
268 let initial: Vec<Subgraph<Initial>> = subgraphs
269 .into_iter()
270 .map(|s| s.try_into())
271 .filter_map(|r| {
272 r.map_err(|e: SubgraphError| issues.extend(convert_subraph_error_to_issues(e)))
273 .ok()
274 })
275 .collect();
276 if !issues.is_empty() {
277 return Err(issues);
278 }
279 expand_subgraphs(initial)
280 .and_then(upgrade_subgraphs_if_necessary)
281 .map(|subgraphs| subgraphs.into_iter().map(|s| s.into()).collect())
282 .map_err(|errors| errors.into_iter().map(Issue::from).collect::<Vec<_>>())
283 }
284
285 async fn experimental_validate_subgraphs(
287 &mut self,
288 subgraphs: Vec<SubgraphDefinition>,
289 ) -> Result<Vec<SubgraphDefinition>, Vec<Issue>> {
290 let mut issues = vec![];
291 let upgraded: Vec<Subgraph<Upgraded>> = subgraphs
292 .into_iter()
293 .map(assume_subgraph_upgraded)
294 .filter_map(|r| {
295 r.map_err(|e| issues.extend(convert_subraph_error_to_issues(e)))
296 .ok()
297 })
298 .collect();
299 if !issues.is_empty() {
300 return Err(issues);
302 }
303 validate_subgraphs(upgraded)
304 .map(|subgraphs| subgraphs.into_iter().map(|s| s.into()).collect())
305 .map_err(|errors| errors.into_iter().map(Issue::from).collect::<Vec<_>>())
306 }
307
308 async fn experimental_merge_subgraphs(
309 &mut self,
310 subgraphs: Vec<SubgraphDefinition>,
311 ) -> Result<MergeResult, Vec<Issue>> {
312 let mut subgraph_errors = vec![];
313 let validated: Vec<Subgraph<Validated>> = subgraphs
314 .into_iter()
315 .map(assume_subgraph_validated)
316 .filter_map(|r| {
317 r.map_err(|e| subgraph_errors.extend(convert_subraph_error_to_issues(e)))
318 .ok()
319 })
320 .collect();
321 if !subgraph_errors.is_empty() {
322 return Err(subgraph_errors);
324 }
325 pre_merge_validations(&validated)
326 .map_err(|errors| errors.into_iter().map(Issue::from).collect::<Vec<_>>())?;
327 let supergraph = merge_subgraphs(validated)
328 .map_err(|errors| errors.into_iter().map(Issue::from).collect::<Vec<_>>())?;
329 post_merge_validations(&supergraph)
330 .map_err(|errors| errors.into_iter().map(Issue::from).collect::<Vec<_>>())?;
331 let hints = supergraph
332 .hints()
333 .iter()
334 .map(|h| CompositionHint {
335 message: h.message.clone(),
336 definition: HintCodeDefinition {
337 code: h.code.clone(),
338 },
339 nodes: None,
340 })
341 .collect();
342 Ok(MergeResult {
343 supergraph: supergraph.schema().to_string(),
344 hints,
345 })
346 }
347
348 async fn experimental_validate_satisfiability(
349 &mut self,
350 supergraph_sdl: &str,
351 ) -> Result<Vec<CompositionHint>, Vec<Issue>> {
352 let supergraph = Supergraph::parse(supergraph_sdl).map_err(|e| vec![Issue::from(e)])?;
353 validate_satisfiability(supergraph)
354 .map(|s| {
355 s.hints()
356 .iter()
357 .map(|h| CompositionHint {
358 message: h.message.clone(),
359 definition: HintCodeDefinition {
360 code: h.code.clone(),
361 },
362 nodes: None,
363 })
364 .collect()
365 })
366 .map_err(|errors| errors.into_iter().map(Issue::from).collect::<Vec<_>>())
367 }
368}
369
370struct SubgraphSchema {
371 schema: Schema,
372 has_connectors: bool,
373}
374
375struct ConnectorsValidationResult {
376 subgraphs: Vec<SubgraphDefinition>,
377 parsed_subgraphs: HashMap<String, SubgraphSchema>,
378 hints: Vec<Issue>,
379}
380fn validate_connector_subgraphs(
382 subgraph_definitions: Vec<SubgraphDefinition>,
383) -> Result<ConnectorsValidationResult, Vec<Issue>> {
384 let mut subgraph_validation_errors = Vec::new();
385 let mut subgraph_validation_hints = Vec::new();
386 let mut parsed_schemas = HashMap::new();
387 let subgraph_definitions = subgraph_definitions
388 .into_iter()
389 .map(|mut subgraph| {
390 let ValidationResult {
391 errors,
392 has_connectors,
393 schema,
394 transformed,
395 } = validate(subgraph.sdl, &subgraph.name);
396 subgraph.sdl = transformed;
397 for error in errors {
398 let issue = Issue {
399 code: error.code.to_string(),
400 message: error.message,
401 locations: error
402 .locations
403 .into_iter()
404 .map(|range| SubgraphLocation {
405 subgraph: Some(subgraph.name.clone()),
406 range: Some(range),
407 })
408 .collect(),
409 severity: convert_severity(error.code.severity()),
410 };
411 if issue.severity == Severity::Error {
412 subgraph_validation_errors.push(issue);
413 } else {
414 subgraph_validation_hints.push(issue);
415 }
416 }
417 parsed_schemas.insert(
418 subgraph.name.clone(),
419 SubgraphSchema {
420 schema,
421 has_connectors,
422 },
423 );
424 subgraph
425 })
426 .collect();
427
428 if !subgraph_validation_errors.is_empty() {
429 return Err(subgraph_validation_errors);
430 }
431 Ok(ConnectorsValidationResult {
432 subgraphs: subgraph_definitions,
433 parsed_subgraphs: parsed_schemas,
434 hints: subgraph_validation_hints,
435 })
436}
437
438fn validate_overrides(schemas: HashMap<String, SubgraphSchema>) -> Vec<Issue> {
445 let mut override_errors = Vec::new();
446 for (subgraph_name, SubgraphSchema { schema, .. }) in &schemas {
447 macro_rules! extract_directives {
450 ($node:ident) => {
451 $node
452 .fields
453 .iter()
454 .flat_map(|(name, field)| {
455 field
456 .directives
457 .iter()
458 .map(move |d| (format!("{}.{}", $node.name, name), d))
459 })
460 .collect::<Vec<_>>()
461 };
462 }
463
464 let override_directives = schema
465 .types
466 .values()
467 .flat_map(|v| match v {
468 ExtendedType::Object(node) => extract_directives!(node),
469 ExtendedType::Interface(node) => extract_directives!(node),
470 ExtendedType::InputObject(node) => extract_directives!(node),
471
472 ExtendedType::Scalar(_) | ExtendedType::Union(_) | ExtendedType::Enum(_) => {
474 Vec::new()
475 }
476 })
477 .filter(|(_, directive)| {
478 directive.name == "override" || directive.name == "federation__override"
481 });
482
483 for (field, directive) in override_directives {
485 let Ok(Some(overridden_subgraph_name)) = directive
488 .argument_by_name("from", schema)
489 .map(|node| node.as_str())
490 else {
491 continue;
492 };
493
494 if schemas
495 .get(overridden_subgraph_name)
496 .is_some_and(|schema| schema.has_connectors)
497 {
498 override_errors.push(Issue {
499 code: "OVERRIDE_ON_CONNECTOR".to_string(),
500 message: format!(
501 r#"Field "{field}" on subgraph "{subgraph_name}" is trying to override connector-enabled subgraph "{overridden_subgraph_name}", which is not yet supported. See https://go.apollo.dev/connectors/limitations#override-is-partially-unsupported"#,
502 ),
503 locations: vec![SubgraphLocation {
504 subgraph: Some(String::from(overridden_subgraph_name)),
505 range: directive.line_column_range(&schema.sources),
506 }],
507 severity: Severity::Error,
508 });
509 }
510 }
511 }
512
513 override_errors
514}
515
516fn sanitize_connectors_issue<'a>(
517 issue: &mut Issue,
518 connector_subgraphs: impl Iterator<Item = (&'a Arc<str>, &'a Connector)>,
519) {
520 for (service_name, connector) in connector_subgraphs {
521 issue.message = issue
522 .message
523 .replace(&**service_name, connector.id.subgraph_name.as_str());
524 }
525}
526
527pub type SupergraphSdl<'a> = &'a str;
528
529#[derive(Clone, Debug)]
531pub struct PartialSuccess {
532 pub supergraph_sdl: String,
533 pub issues: Vec<Issue>,
534}
535
536fn convert_severity(severity: ValidationSeverity) -> Severity {
537 match severity {
538 ValidationSeverity::Error => Severity::Error,
539 ValidationSeverity::Warning => Severity::Warning,
540 }
541}
542
543fn satisfiability_result_into_issues(
544 satisfiability_result: Result<SatisfiabilityResult, Issue>,
545) -> Either<impl Iterator<Item = Issue>, impl Iterator<Item = Issue>> {
546 match satisfiability_result {
547 Ok(satisfiability_result) => Either::Left(
548 satisfiability_result
549 .errors
550 .into_iter()
551 .flatten()
552 .map(Issue::from)
553 .chain(
554 satisfiability_result
555 .hints
556 .into_iter()
557 .flatten()
558 .map(Issue::from),
559 ),
560 ),
561 Err(issue) => Either::Right(once(issue)),
562 }
563}
564
565fn assume_subgraph_upgraded(
567 definition: SubgraphDefinition,
568) -> Result<Subgraph<Upgraded>, SubgraphError> {
569 Subgraph::parse(
570 definition.name.as_str(),
571 definition.url.as_str(),
572 definition.sdl.as_str(),
573 )
574 .and_then(|s| s.assume_expanded())
575 .map(|s| s.assume_upgraded())
576}
577
578fn assume_subgraph_validated(
580 definition: SubgraphDefinition,
581) -> Result<Subgraph<Validated>, SubgraphError> {
582 assume_subgraph_upgraded(definition).and_then(|s| s.assume_validated())
583}