1use apollo_compiler::{schema::ExtendedType, Schema};
2use apollo_federation::composition::{
3 expand_subgraphs, merge_subgraphs, post_merge_validations, pre_merge_validations,
4 upgrade_subgraphs_if_necessary, validate_satisfiability, Supergraph,
5};
6use apollo_federation::connectors::{
7 expand::{expand_connectors, Connectors, ExpansionResult},
8 validation::{validate, Severity as ValidationSeverity, ValidationResult},
9 Connector,
10};
11use apollo_federation::internal_composition_api::validate_cache_tag_directives;
12use apollo_federation::subgraph::typestate::{Initial, Subgraph, Validated};
13use apollo_federation::subgraph::SubgraphError;
14use apollo_federation_types::build_plugin::PluginResult;
15use apollo_federation_types::composition::{MergeResult, SubgraphLocation};
16use apollo_federation_types::{
17 composition::{Issue, Severity},
18 javascript::SubgraphDefinition,
19};
20use std::collections::HashMap;
21use std::iter::once;
22use std::sync::Arc;
23
24#[allow(async_fn_in_trait)]
30pub trait HybridComposition {
31 async fn compose_services_without_satisfiability(
34 &mut self,
35 subgraph_definitions: Vec<SubgraphDefinition>,
36 ) -> Option<SupergraphSdl<'_>>;
37
38 async fn validate_satisfiability(&mut self) -> Result<Vec<Issue>, Vec<Issue>>;
52
53 fn update_supergraph_sdl(&mut self, supergraph_sdl: String);
56
57 fn add_issues<Source: Iterator<Item = Issue>>(&mut self, issues: Source);
62
63 async fn compose(&mut self, subgraph_definitions: Vec<SubgraphDefinition>) {
77 let mut cache_tag_errors = Vec::new();
78 for subgraph_def in &subgraph_definitions {
79 match validate_cache_tag_directives(
80 &subgraph_def.name,
81 &subgraph_def.url,
82 &subgraph_def.sdl,
83 ) {
84 Err(err) => {
85 self.add_issues(once(Issue {
86 code: "INTERNAL_ERROR".to_string(),
87 message: format!(
88 "Composition failed due to an internal error when validating cache tag, please report this: {err}"
89 ),
90 locations: vec![],
91 severity: Severity::Error,
92 }));
93 return;
94 }
95 Ok(res) => {
96 if !res.errors.is_empty() {
97 cache_tag_errors.extend(res.errors.into_iter().map(|err| {
98 Issue {
99 code: err.code(),
100 message: err.message(),
101 locations: err
102 .locations
103 .into_iter()
104 .map(|range| SubgraphLocation {
105 subgraph: Some(subgraph_def.name.clone()),
106 range: Some(range),
107 })
108 .collect(),
109 severity: Severity::Error,
110 }
111 }));
112 }
113 }
114 }
115 }
116 if !cache_tag_errors.is_empty() {
117 self.add_issues(cache_tag_errors.into_iter());
118 return;
119 }
120
121 let ConnectorsValidationResult {
123 subgraphs,
124 parsed_subgraphs,
125 hints: connector_hints,
126 } = match validate_connector_subgraphs(subgraph_definitions) {
127 Ok(results) => results,
128 Err(errors) => {
129 self.add_issues(errors.into_iter());
130 return;
131 }
132 };
133 self.add_issues(connector_hints.into_iter());
134
135 let Some(supergraph_sdl) = self
136 .compose_services_without_satisfiability(subgraphs)
137 .await
138 else {
139 return;
140 };
141
142 let override_errors = validate_overrides(parsed_subgraphs);
145 if !override_errors.is_empty() {
146 self.add_issues(override_errors.into_iter());
147 return;
148 }
149
150 let expansion_result = match expand_connectors(supergraph_sdl, &Default::default()) {
151 Ok(result) => result,
152 Err(err) => {
153 self.add_issues(once(Issue {
154 code: "INTERNAL_ERROR".to_string(),
155 message: format!(
156 "Composition failed due to an internal error when expanding connectors, please report this: {err}"
157 ),
158 locations: vec![],
159 severity: Severity::Error,
160 }));
161 return;
162 }
163 };
164 match expansion_result {
165 ExpansionResult::Expanded {
166 raw_sdl,
167 connectors: Connectors {
168 by_service_name, ..
169 },
170 ..
171 } => {
172 let original_supergraph_sdl = supergraph_sdl.to_string();
173 self.update_supergraph_sdl(raw_sdl);
174 let satisfiability_result = self.validate_satisfiability().await;
175 self.add_issues(
176 satisfiability_result_into_issues(satisfiability_result).map(|mut issue| {
177 sanitize_connectors_issue(&mut issue, by_service_name.iter());
178 issue
179 }),
180 );
181
182 self.update_supergraph_sdl(original_supergraph_sdl);
183 }
184 ExpansionResult::Unchanged => {
185 let satisfiability_result = self.validate_satisfiability().await;
186 self.add_issues(satisfiability_result_into_issues(satisfiability_result));
187 }
188 }
189 }
190
191 async fn experimental_compose(
210 mut self,
211 subgraph_definitions: Vec<SubgraphDefinition>,
212 ) -> Result<PluginResult, Vec<Issue>>
213 where
214 Self: Sized,
215 {
216 let ConnectorsValidationResult {
221 subgraphs: connected_subgraphs,
222 parsed_subgraphs,
223 hints: connector_hints,
224 } = validate_connector_subgraphs(subgraph_definitions)?;
225
226 let upgraded_subgraphs = self
227 .experimental_upgrade_subgraphs(connected_subgraphs)
228 .await?;
229
230 let merge_result = self
232 .experimental_merge_subgraphs(upgraded_subgraphs)
233 .await?;
234
235 let override_errors = validate_overrides(parsed_subgraphs);
239 if !override_errors.is_empty() {
240 return Err(override_errors);
241 }
242
243 let supergraph_sdl = merge_result.supergraph.clone();
245 let expansion_result = match expand_connectors(&supergraph_sdl, &Default::default()) {
246 Ok(result) => result,
247 Err(err) => {
248 return Err(vec![err.into()]);
249 }
250 };
251
252 match expansion_result {
254 ExpansionResult::Expanded {
255 raw_sdl,
256 connectors: Connectors {
257 by_service_name, ..
258 },
259 ..
260 } => {
261 self.experimental_validate_satisfiability(raw_sdl.as_str())
262 .await
263 .map(|s| {
264 let mut composition_hints = merge_result.hints;
265 composition_hints.extend(s);
266
267 let mut build_messages: Vec<_> =
268 connector_hints.into_iter().map(|h| h.into()).collect();
269 build_messages.extend(composition_hints.into_iter().map(|h| {
270 let mut issue = Into::<Issue>::into(h);
271 sanitize_connectors_issue(&mut issue, by_service_name.iter());
272 issue.into()
273 }));
274 PluginResult::new(Ok(supergraph_sdl), build_messages)
276 })
277 .map_err(|err| {
278 err.into_iter()
279 .map(|mut issue| {
280 sanitize_connectors_issue(&mut issue, by_service_name.iter());
281 issue
282 })
283 .collect()
284 })
285 }
286 ExpansionResult::Unchanged => self
287 .experimental_validate_satisfiability(supergraph_sdl.as_str())
288 .await
289 .map(|s| {
290 let mut hints = merge_result.hints;
291 hints.extend(s);
292
293 let build_messages: Vec<_> = hints
294 .into_iter()
295 .map(|h| Into::<Issue>::into(h).into())
296 .collect();
297 PluginResult::new(Ok(supergraph_sdl), build_messages)
298 }),
299 }
300 }
301
302 async fn experimental_upgrade_subgraphs(
310 &mut self,
311 subgraphs: Vec<SubgraphDefinition>,
312 ) -> Result<Vec<SubgraphDefinition>, Vec<Issue>> {
313 let mut issues: Vec<Issue> = vec![];
314 let initial: Vec<Subgraph<Initial>> = subgraphs
315 .into_iter()
316 .map(|s| s.try_into())
317 .filter_map(|r| {
318 r.map_err(|e: SubgraphError| issues.extend(convert_subgraph_error_to_issues(e)))
319 .ok()
320 })
321 .collect();
322 if !issues.is_empty() {
323 return Err(issues);
324 }
325 expand_subgraphs(initial)
326 .and_then(upgrade_subgraphs_if_necessary)
327 .map(|subgraphs| subgraphs.into_iter().map(|s| s.into()).collect())
328 .map_err(|errors| errors.into_iter().map(Issue::from).collect::<Vec<_>>())
329 }
330
331 async fn experimental_merge_subgraphs(
333 &mut self,
334 subgraphs: Vec<SubgraphDefinition>,
335 ) -> Result<MergeResult, Vec<Issue>> {
336 let mut subgraph_errors = vec![];
337 let validated: Vec<Subgraph<Validated>> = subgraphs
338 .into_iter()
339 .map(assume_subgraph_validated)
340 .filter_map(|r| {
341 r.map_err(|e| subgraph_errors.extend(convert_subgraph_error_to_issues(e)))
342 .ok()
343 })
344 .collect();
345 if !subgraph_errors.is_empty() {
346 return Err(subgraph_errors);
348 }
349 pre_merge_validations(&validated)
350 .map_err(|errors| errors.into_iter().map(Issue::from).collect::<Vec<_>>())?;
351 let supergraph = merge_subgraphs(validated)
352 .map_err(|errors| errors.into_iter().map(Issue::from).collect::<Vec<_>>())?;
353 post_merge_validations(&supergraph)
354 .map_err(|errors| errors.into_iter().map(Issue::from).collect::<Vec<_>>())?;
355 let hints = supergraph
356 .hints()
357 .iter()
358 .map(|hint| hint.clone().into())
359 .collect();
360 Ok(MergeResult {
361 supergraph: supergraph.schema().to_string(),
362 hints,
363 })
364 }
365
366 async fn experimental_validate_satisfiability(
368 &mut self,
369 supergraph_sdl: &str,
370 ) -> Result<Vec<Issue>, Vec<Issue>> {
371 let supergraph = Supergraph::parse(supergraph_sdl).map_err(|e| vec![Issue::from(e)])?;
372 validate_satisfiability(supergraph)
373 .map(|s| s.hints().iter().map(|h| h.clone().into()).collect())
374 .map_err(|errors| errors.into_iter().map(Issue::from).collect::<Vec<_>>())
375 }
376}
377
378struct SubgraphSchema {
379 schema: Schema,
380 has_connectors: bool,
381}
382
383struct ConnectorsValidationResult {
384 subgraphs: Vec<SubgraphDefinition>,
385 parsed_subgraphs: HashMap<String, SubgraphSchema>,
386 hints: Vec<Issue>,
387}
388fn validate_connector_subgraphs(
390 subgraph_definitions: Vec<SubgraphDefinition>,
391) -> Result<ConnectorsValidationResult, Vec<Issue>> {
392 let mut subgraph_validation_errors = Vec::new();
393 let mut subgraph_validation_hints = Vec::new();
394 let mut parsed_schemas = HashMap::new();
395 let subgraph_definitions = subgraph_definitions
396 .into_iter()
397 .map(|mut subgraph| {
398 let ValidationResult {
399 errors,
400 has_connectors,
401 schema,
402 transformed,
403 } = validate(subgraph.sdl, &subgraph.name);
404 subgraph.sdl = transformed;
405 for error in errors {
406 let issue = Issue {
407 code: error.code.to_string(),
408 message: error.message,
409 locations: error
410 .locations
411 .into_iter()
412 .map(|range| SubgraphLocation {
413 subgraph: Some(subgraph.name.clone()),
414 range: Some(range),
415 })
416 .collect(),
417 severity: convert_severity(error.code.severity()),
418 };
419 if issue.severity == Severity::Error {
420 subgraph_validation_errors.push(issue);
421 } else {
422 subgraph_validation_hints.push(issue);
423 }
424 }
425 parsed_schemas.insert(
426 subgraph.name.clone(),
427 SubgraphSchema {
428 schema,
429 has_connectors,
430 },
431 );
432 subgraph
433 })
434 .collect();
435
436 if !subgraph_validation_errors.is_empty() {
437 return Err(subgraph_validation_errors);
438 }
439 Ok(ConnectorsValidationResult {
440 subgraphs: subgraph_definitions,
441 parsed_subgraphs: parsed_schemas,
442 hints: subgraph_validation_hints,
443 })
444}
445
446fn validate_overrides(schemas: HashMap<String, SubgraphSchema>) -> Vec<Issue> {
453 let mut override_errors = Vec::new();
454 for (subgraph_name, SubgraphSchema { schema, .. }) in &schemas {
455 macro_rules! extract_directives {
458 ($node:ident) => {
459 $node
460 .fields
461 .iter()
462 .flat_map(|(name, field)| {
463 field
464 .directives
465 .iter()
466 .map(move |d| (format!("{}.{}", $node.name, name), d))
467 })
468 .collect::<Vec<_>>()
469 };
470 }
471
472 let override_directives = schema
473 .types
474 .values()
475 .flat_map(|v| match v {
476 ExtendedType::Object(node) => extract_directives!(node),
477 ExtendedType::Interface(node) => extract_directives!(node),
478 ExtendedType::InputObject(node) => extract_directives!(node),
479
480 ExtendedType::Scalar(_) | ExtendedType::Union(_) | ExtendedType::Enum(_) => {
482 Vec::new()
483 }
484 })
485 .filter(|(_, directive)| {
486 directive.name == "override" || directive.name == "federation__override"
489 });
490
491 for (field, directive) in override_directives {
493 let Ok(Some(overridden_subgraph_name)) = directive
496 .argument_by_name("from", schema)
497 .map(|node| node.as_str())
498 else {
499 continue;
500 };
501
502 if schemas
503 .get(overridden_subgraph_name)
504 .is_some_and(|schema| schema.has_connectors)
505 {
506 override_errors.push(Issue {
507 code: "OVERRIDE_ON_CONNECTOR".to_string(),
508 message: format!(
509 r#"Field "{field}" on subgraph "{subgraph_name}" is trying to override connector-enabled subgraph "{overridden_subgraph_name}", which is not yet supported. See https://go.apollo.dev/connectors/limitations#override-is-partially-unsupported"#,
510 ),
511 locations: vec![SubgraphLocation {
512 subgraph: Some(String::from(overridden_subgraph_name)),
513 range: directive.line_column_range(&schema.sources),
514 }],
515 severity: Severity::Error,
516 });
517 }
518 }
519 }
520
521 override_errors
522}
523
524fn sanitize_connectors_issue<'a>(
525 issue: &mut Issue,
526 connector_subgraphs: impl Iterator<Item = (&'a Arc<str>, &'a Connector)>,
527) {
528 for (service_name, connector) in connector_subgraphs {
529 issue.message = issue
530 .message
531 .replace(&**service_name, connector.id.subgraph_name.as_str());
532 }
533}
534
535pub type SupergraphSdl<'a> = &'a str;
536
537#[derive(Clone, Debug)]
539pub struct PartialSuccess {
540 pub supergraph_sdl: String,
541 pub issues: Vec<Issue>,
542}
543
544fn convert_severity(severity: ValidationSeverity) -> Severity {
545 match severity {
546 ValidationSeverity::Error => Severity::Error,
547 ValidationSeverity::Warning => Severity::Warning,
548 }
549}
550
551fn satisfiability_result_into_issues(
552 result: Result<Vec<Issue>, Vec<Issue>>,
553) -> impl Iterator<Item = Issue> {
554 match result {
555 Ok(hints) => hints.into_iter(),
556 Err(errors) => errors.into_iter(),
557 }
558}
559
560fn assume_subgraph_validated(
563 definition: SubgraphDefinition,
564) -> Result<Subgraph<Validated>, SubgraphError> {
565 Subgraph::parse(
566 definition.name.as_str(),
567 definition.url.as_str(),
568 definition.sdl.as_str(),
569 )
570 .and_then(|s| s.assume_expanded())
571 .map(|s| s.assume_validated())
572}
573
574fn convert_subgraph_error_to_issues(error: SubgraphError) -> Vec<Issue> {
575 error
576 .to_composition_errors()
577 .map(|err| err.into())
578 .collect()
579}