apollo_composition/
lib.rs

1use apollo_compiler::{schema::ExtendedType, Schema};
2use apollo_federation::composition::{
3    expand_subgraphs, merge_subgraphs, post_merge_validations, pre_merge_validations,
4    upgrade_subgraphs_if_necessary, validate_satisfiability, validate_subgraphs, Supergraph,
5};
6use apollo_federation::sources::connect::{
7    expand::{expand_connectors, Connectors, ExpansionResult},
8    validation::{validate, Severity as ValidationSeverity, ValidationResult},
9    Connector,
10};
11use apollo_federation::subgraph::typestate::{Initial, Subgraph, Upgraded, Validated};
12use apollo_federation::subgraph::SubgraphError;
13use apollo_federation_types::build_plugin::{BuildMessage, PluginResult};
14use apollo_federation_types::composition::{convert_subraph_error_to_issues, SubgraphLocation};
15use apollo_federation_types::javascript::{CompositionHint, HintCodeDefinition, MergeResult};
16use apollo_federation_types::{
17    composition::{Issue, Severity},
18    javascript::{SatisfiabilityResult, SubgraphDefinition},
19};
20use either::Either;
21use std::collections::HashMap;
22use std::iter::once;
23use std::sync::Arc;
24
25/// This trait includes all the Rust-side composition logic, plus hooks for the JavaScript side.
26/// If you implement the functions in this trait to build your own JavaScript interface, then you
27/// can call [`HybridComposition::compose`] to run the complete composition process.
28///
29/// JavaScript should be implemented using `@apollo/composition@2.9.0-connectors.0`.
30#[allow(async_fn_in_trait)]
31pub trait HybridComposition {
32    /// Call the JavaScript `composeServices` function from `@apollo/composition` plus whatever
33    /// extra logic you need. Make sure to disable satisfiability, like `composeServices(definitions, {runSatisfiability: false})`
34    async fn compose_services_without_satisfiability(
35        &mut self,
36        subgraph_definitions: Vec<SubgraphDefinition>,
37    ) -> Option<SupergraphSdl>;
38
39    /// Call the JavaScript `validateSatisfiability` function from `@apollo/composition` plus whatever
40    /// extra logic you need.
41    ///
42    /// # Input
43    ///
44    /// The `validateSatisfiability` function wants an argument like `{ supergraphSdl }`. That field
45    /// should be the value that's updated when [`update_supergraph_sdl`] is called.
46    ///
47    /// # Output
48    ///
49    /// If satisfiability completes from JavaScript, the [`SatisfiabilityResult`] (matching the shape
50    /// of that function) should be returned. If Satisfiability _can't_ be run, you can return an
51    /// `Err(Issue)` instead indicating what went wrong.
52    async fn validate_satisfiability(&mut self) -> Result<SatisfiabilityResult, Issue>;
53
54    /// Allows the Rust composition code to modify the stored supergraph SDL
55    /// (for example, to expand connectors).
56    fn update_supergraph_sdl(&mut self, supergraph_sdl: String);
57
58    /// When the Rust composition/validation code finds issues, it will call this method to add
59    /// them to the list of issues that will be returned to the user.
60    ///
61    /// It's on the implementor of this trait to convert `From<Issue>`
62    fn add_issues<Source: Iterator<Item = Issue>>(&mut self, issues: Source);
63
64    /// Runs the complete composition process, hooking into both the Rust and JavaScript implementations.
65    ///
66    /// # Asyncness
67    ///
68    /// While this function is async to allow for flexible JavaScript execution, it is a CPU-heavy task.
69    /// Take care when consuming this in an async context, as it may block longer than desired.
70    ///
71    /// # Algorithm
72    ///
73    /// 1. Run Rust-based validation on the subgraphs
74    /// 2. Call [`compose_services_without_satisfiability`] to run JavaScript-based composition
75    /// 3. Run Rust-based validation on the supergraph
76    /// 4. Call [`validate_satisfiability`] to run JavaScript-based validation on the supergraph
77    async fn compose(&mut self, subgraph_definitions: Vec<SubgraphDefinition>) {
78        // connectors subgraph validations
79        let ConnectorsValidationResult {
80            subgraphs,
81            parsed_subgraphs,
82            hints: connector_hints,
83        } = match validate_connector_subgraphs(subgraph_definitions) {
84            Ok(results) => results,
85            Err(errors) => {
86                self.add_issues(errors.into_iter());
87                return;
88            }
89        };
90        self.add_issues(connector_hints.into_iter());
91
92        let Some(supergraph_sdl) = self
93            .compose_services_without_satisfiability(subgraphs)
94            .await
95        else {
96            return;
97        };
98
99        // Any issues with overrides are fatal since they'll cause errors in expansion,
100        // so we return early if we see any.
101        let override_errors = validate_overrides(parsed_subgraphs);
102        if !override_errors.is_empty() {
103            self.add_issues(override_errors.into_iter());
104            return;
105        }
106
107        let expansion_result = match expand_connectors(supergraph_sdl, &Default::default()) {
108            Ok(result) => result,
109            Err(err) => {
110                self.add_issues(once(Issue {
111                    code: "INTERNAL_ERROR".to_string(),
112                    message: format!(
113                        "Composition failed due to an internal error, please report this: {}",
114                        err
115                    ),
116                    locations: vec![],
117                    severity: Severity::Error,
118                }));
119                return;
120            }
121        };
122        match expansion_result {
123            ExpansionResult::Expanded {
124                raw_sdl,
125                connectors: Connectors {
126                    by_service_name, ..
127                },
128                ..
129            } => {
130                let original_supergraph_sdl = supergraph_sdl.to_string();
131                self.update_supergraph_sdl(raw_sdl);
132                let satisfiability_result = self.validate_satisfiability().await;
133                self.add_issues(
134                    satisfiability_result_into_issues(satisfiability_result).map(|mut issue| {
135                        sanitize_connectors_issue(&mut issue, by_service_name.iter());
136                        issue
137                    }),
138                );
139
140                self.update_supergraph_sdl(original_supergraph_sdl);
141            }
142            ExpansionResult::Unchanged => {
143                let satisfiability_result = self.validate_satisfiability().await;
144                self.add_issues(satisfiability_result_into_issues(satisfiability_result));
145            }
146        }
147    }
148
149    /// <div class="warning">*** EXPERIMENTAL ***</div>
150    ///
151    /// Runs the composition process with granular composition phases that allow replacing individual
152    /// steps with Rust and/or JavaScript implementations.
153    ///
154    /// 1. <connectors> subgraph validation
155    /// 2. Initialize subgraphs - parses SDL into a GraphQL schema
156    /// 3. Expands subgraphs - adds all missing federation definitions
157    /// 4. Upgrade subgraphs - upgrades fed v1 schemas to fed v2
158    /// 5. Validate subgraphs
159    /// 6. Pre-merge validations (includes connectors validations)
160    /// 7. Merge subgraphs into a supergrpah
161    /// 8. Post merge validations
162    /// 9. <connectors> expand supergraph
163    /// 10. Validate satisfiability
164    ///
165    /// In case of a composition failure, we return a list of errors from the current composition
166    /// phase.
167    async fn experimental_compose(
168        mut self,
169        subgraph_definitions: Vec<SubgraphDefinition>,
170    ) -> Result<PluginResult, Vec<Issue>>
171    where
172        Self: Sized,
173    {
174        let upgraded_subgraphs = self
175            .experimental_upgrade_subgraphs(subgraph_definitions)
176            .await?;
177        let validated_subgraphs = self
178            .experimental_validate_subgraphs(upgraded_subgraphs)
179            .await?;
180
181        // connectors validations
182        // Any issues with overrides are fatal since they'll cause errors in expansion,
183        // so we return early if we see any.
184        let ConnectorsValidationResult {
185            subgraphs: connected_subgraphs,
186            parsed_subgraphs,
187            hints: connector_hints,
188        } = validate_connector_subgraphs(validated_subgraphs)?;
189        let override_errors = validate_overrides(parsed_subgraphs);
190        if !override_errors.is_empty() {
191            return Err(override_errors);
192        }
193
194        // merge
195        let merge_result = self
196            .experimental_merge_subgraphs(connected_subgraphs)
197            .await?;
198
199        // expand connectors as needed
200        let supergraph_sdl = merge_result.supergraph.clone();
201        let expansion_result = match expand_connectors(&supergraph_sdl, &Default::default()) {
202            Ok(result) => result,
203            Err(err) => {
204                return Err(vec![err.into()]);
205            }
206        };
207
208        // verify satisfiability
209        match expansion_result {
210            ExpansionResult::Expanded {
211                raw_sdl,
212                connectors: Connectors {
213                    by_service_name, ..
214                },
215                ..
216            } => {
217                self.experimental_validate_satisfiability(raw_sdl.as_str())
218                    .await
219                    .map(|s| {
220                        let mut composition_hints = merge_result.hints;
221                        composition_hints.extend(s);
222
223                        let mut build_messages: Vec<BuildMessage> =
224                            connector_hints.into_iter().map(|h| h.into()).collect();
225                        build_messages.extend(composition_hints.into_iter().map(|h| {
226                            let mut issue = Into::<Issue>::into(h);
227                            sanitize_connectors_issue(&mut issue, by_service_name.iter());
228                            issue.into()
229                        }));
230                        // return original supergraph
231                        PluginResult::new(Ok(supergraph_sdl), build_messages)
232                    })
233                    .map_err(|err| {
234                        err.into_iter()
235                            .map(|mut issue| {
236                                sanitize_connectors_issue(&mut issue, by_service_name.iter());
237                                issue
238                            })
239                            .collect()
240                    })
241            }
242            ExpansionResult::Unchanged => self
243                .experimental_validate_satisfiability(supergraph_sdl.as_str())
244                .await
245                .map(|s| {
246                    let mut hints = merge_result.hints;
247                    hints.extend(s);
248
249                    let build_messages: Vec<BuildMessage> = hints
250                        .into_iter()
251                        .map(|h| Into::<Issue>::into(h).into())
252                        .collect();
253                    PluginResult::new(Ok(supergraph_sdl), build_messages)
254                }),
255        }
256    }
257
258    /// Maps to upgradeSubgraphsIfNecessary and performs following steps
259    ///
260    /// 1. Parses raw SDL schemas into Subgraph<Initial>
261    /// 2. Adds missing federation definitions to the subgraph schemas
262    /// 3. Upgrades federation v1 subgraphs to federation v2 schemas.
263    ///    This is a no-op if it is already a federation v2 subgraph.
264    async fn experimental_upgrade_subgraphs(
265        &mut self,
266        subgraphs: Vec<SubgraphDefinition>,
267    ) -> Result<Vec<SubgraphDefinition>, Vec<Issue>> {
268        let mut issues: Vec<Issue> = vec![];
269        let initial: Vec<Subgraph<Initial>> = subgraphs
270            .into_iter()
271            .map(|s| s.try_into())
272            .filter_map(|r| {
273                r.map_err(|e: SubgraphError| issues.extend(convert_subraph_error_to_issues(e)))
274                    .ok()
275            })
276            .collect();
277        if !issues.is_empty() {
278            return Err(issues);
279        }
280        expand_subgraphs(initial)
281            .and_then(upgrade_subgraphs_if_necessary)
282            .map(|subgraphs| subgraphs.into_iter().map(|s| s.into()).collect())
283            .map_err(|errors| errors.into_iter().map(Issue::from).collect::<Vec<_>>())
284    }
285
286    /// Performs all subgraph validations.
287    async fn experimental_validate_subgraphs(
288        &mut self,
289        subgraphs: Vec<SubgraphDefinition>,
290    ) -> Result<Vec<SubgraphDefinition>, Vec<Issue>> {
291        let mut issues = vec![];
292        let upgraded: Vec<Subgraph<Upgraded>> = subgraphs
293            .into_iter()
294            .map(assume_subgraph_upgraded)
295            .filter_map(|r| {
296                r.map_err(|e| issues.extend(convert_subraph_error_to_issues(e)))
297                    .ok()
298            })
299            .collect();
300        if !issues.is_empty() {
301            // this should never happen
302            return Err(issues);
303        }
304        validate_subgraphs(upgraded)
305            .map(|subgraphs| subgraphs.into_iter().map(|s| s.into()).collect())
306            .map_err(|errors| errors.into_iter().map(Issue::from).collect::<Vec<_>>())
307    }
308
309    async fn experimental_merge_subgraphs(
310        &mut self,
311        subgraphs: Vec<SubgraphDefinition>,
312    ) -> Result<MergeResult, Vec<Issue>> {
313        let mut subgraph_errors = vec![];
314        let validated: Vec<Subgraph<Validated>> = subgraphs
315            .into_iter()
316            .map(assume_subgraph_validated)
317            .filter_map(|r| {
318                r.map_err(|e| subgraph_errors.extend(convert_subraph_error_to_issues(e)))
319                    .ok()
320            })
321            .collect();
322        if !subgraph_errors.is_empty() {
323            // this should never happen
324            return Err(subgraph_errors);
325        }
326        pre_merge_validations(&validated)
327            .map_err(|errors| errors.into_iter().map(Issue::from).collect::<Vec<_>>())?;
328        let supergraph = merge_subgraphs(validated)
329            .map_err(|errors| errors.into_iter().map(Issue::from).collect::<Vec<_>>())?;
330        post_merge_validations(&supergraph)
331            .map_err(|errors| errors.into_iter().map(Issue::from).collect::<Vec<_>>())?;
332        let hints = supergraph
333            .hints()
334            .iter()
335            .map(|h| CompositionHint {
336                message: h.message.clone(),
337                definition: HintCodeDefinition {
338                    code: h.code.clone(),
339                },
340                nodes: None,
341            })
342            .collect();
343        Ok(MergeResult {
344            supergraph: supergraph.schema().to_string(),
345            hints,
346        })
347    }
348
349    async fn experimental_validate_satisfiability(
350        &mut self,
351        supergraph_sdl: &str,
352    ) -> Result<Vec<CompositionHint>, Vec<Issue>> {
353        let supergraph = Supergraph::parse(supergraph_sdl).map_err(|e| vec![Issue::from(e)])?;
354        validate_satisfiability(supergraph)
355            .map(|s| {
356                s.hints()
357                    .iter()
358                    .map(|h| CompositionHint {
359                        message: h.message.clone(),
360                        definition: HintCodeDefinition {
361                            code: h.code.clone(),
362                        },
363                        nodes: None,
364                    })
365                    .collect()
366            })
367            .map_err(|errors| errors.into_iter().map(Issue::from).collect::<Vec<_>>())
368    }
369}
370
371struct SubgraphSchema {
372    schema: Schema,
373    has_connectors: bool,
374}
375
376struct ConnectorsValidationResult {
377    subgraphs: Vec<SubgraphDefinition>,
378    parsed_subgraphs: HashMap<String, SubgraphSchema>,
379    hints: Vec<Issue>,
380}
381// TODO this should eventually move under expand/validate subgraph logic
382fn validate_connector_subgraphs(
383    subgraph_definitions: Vec<SubgraphDefinition>,
384) -> Result<ConnectorsValidationResult, Vec<Issue>> {
385    let mut subgraph_validation_errors = Vec::new();
386    let mut subgraph_validation_hints = Vec::new();
387    let mut parsed_schemas = HashMap::new();
388    let subgraph_definitions = subgraph_definitions
389        .into_iter()
390        .map(|mut subgraph| {
391            let ValidationResult {
392                errors,
393                has_connectors,
394                schema,
395                transformed,
396            } = validate(subgraph.sdl, &subgraph.name);
397            subgraph.sdl = transformed;
398            for error in errors {
399                let issue = Issue {
400                    code: error.code.to_string(),
401                    message: error.message,
402                    locations: error
403                        .locations
404                        .into_iter()
405                        .map(|range| SubgraphLocation {
406                            subgraph: Some(subgraph.name.clone()),
407                            range: Some(range),
408                        })
409                        .collect(),
410                    severity: convert_severity(error.code.severity()),
411                };
412                if issue.severity == Severity::Error {
413                    subgraph_validation_errors.push(issue);
414                } else {
415                    subgraph_validation_hints.push(issue);
416                }
417            }
418            parsed_schemas.insert(
419                subgraph.name.clone(),
420                SubgraphSchema {
421                    schema,
422                    has_connectors,
423                },
424            );
425            subgraph
426        })
427        .collect();
428
429    if !subgraph_validation_errors.is_empty() {
430        return Err(subgraph_validation_errors);
431    }
432    Ok(ConnectorsValidationResult {
433        subgraphs: subgraph_definitions,
434        parsed_subgraphs: parsed_schemas,
435        hints: subgraph_validation_hints,
436    })
437}
438
439/// Validate overrides for connector-related subgraphs
440///
441/// Overrides mess with the supergraph in ways that can be difficult to detect when
442/// expanding connectors; the supergraph may omit overridden fields and other shenanigans.
443/// To allow for a better developer experience, we check here if any connector-enabled subgraphs
444/// have fields overridden.
445fn validate_overrides(schemas: HashMap<String, SubgraphSchema>) -> Vec<Issue> {
446    let mut override_errors = Vec::new();
447    for (subgraph_name, SubgraphSchema { schema, .. }) in &schemas {
448        // We need to grab all fields in the schema since only fields can have the @override
449        // directive attached
450        macro_rules! extract_directives {
451            ($node:ident) => {
452                $node
453                    .fields
454                    .iter()
455                    .flat_map(|(name, field)| {
456                        field
457                            .directives
458                            .iter()
459                            .map(move |d| (format!("{}.{}", $node.name, name), d))
460                    })
461                    .collect::<Vec<_>>()
462            };
463        }
464
465        let override_directives = schema
466            .types
467            .values()
468            .flat_map(|v| match v {
469                ExtendedType::Object(node) => extract_directives!(node),
470                ExtendedType::Interface(node) => extract_directives!(node),
471                ExtendedType::InputObject(node) => extract_directives!(node),
472
473                // These types do not have fields
474                ExtendedType::Scalar(_) | ExtendedType::Union(_) | ExtendedType::Enum(_) => {
475                    Vec::new()
476                }
477            })
478            .filter(|(_, directive)| {
479                // TODO: The directive name for @override could have been aliased
480                // at the SDL level, so we'll need to extract the aliased name here instead
481                directive.name == "override" || directive.name == "federation__override"
482            });
483
484        // Now see if we have any overrides that try to reference connector subgraphs
485        for (field, directive) in override_directives {
486            // If the override directive does not have a valid `from` field, then there is
487            // no point trying to validate it, as later steps will validate the entire schema.
488            let Ok(Some(overridden_subgraph_name)) = directive
489                .argument_by_name("from", schema)
490                .map(|node| node.as_str())
491            else {
492                continue;
493            };
494
495            if schemas
496                .get(overridden_subgraph_name)
497                .is_some_and(|schema| schema.has_connectors)
498            {
499                override_errors.push(Issue {
500                        code: "OVERRIDE_ON_CONNECTOR".to_string(),
501                        message: format!(
502                            r#"Field "{}" on subgraph "{}" is trying to override connector-enabled subgraph "{}", which is not yet supported. See https://go.apollo.dev/connectors/limitations#override-is-partially-unsupported"#,
503                            field,
504                            subgraph_name,
505                            overridden_subgraph_name,
506                        ),
507                        locations: vec![SubgraphLocation {
508                            subgraph: Some(String::from(overridden_subgraph_name)),
509                            range: directive.line_column_range(&schema.sources),
510                        }],
511                        severity: Severity::Error,
512                    });
513            }
514        }
515    }
516
517    override_errors
518}
519
520fn sanitize_connectors_issue<'a>(
521    issue: &mut Issue,
522    connector_subgraphs: impl Iterator<Item = (&'a Arc<str>, &'a Connector)>,
523) {
524    for (service_name, connector) in connector_subgraphs {
525        issue.message = issue
526            .message
527            .replace(&**service_name, connector.id.subgraph_name.as_str());
528    }
529}
530
531pub type SupergraphSdl<'a> = &'a str;
532
533/// A successfully composed supergraph, optionally with some issues that should be addressed.
534#[derive(Clone, Debug)]
535pub struct PartialSuccess {
536    pub supergraph_sdl: String,
537    pub issues: Vec<Issue>,
538}
539
540fn convert_severity(severity: ValidationSeverity) -> Severity {
541    match severity {
542        ValidationSeverity::Error => Severity::Error,
543        ValidationSeverity::Warning => Severity::Warning,
544    }
545}
546
547fn satisfiability_result_into_issues(
548    satisfiability_result: Result<SatisfiabilityResult, Issue>,
549) -> Either<impl Iterator<Item = Issue>, impl Iterator<Item = Issue>> {
550    match satisfiability_result {
551        Ok(satisfiability_result) => Either::Left(
552            satisfiability_result
553                .errors
554                .into_iter()
555                .flatten()
556                .map(Issue::from)
557                .chain(
558                    satisfiability_result
559                        .hints
560                        .into_iter()
561                        .flatten()
562                        .map(Issue::from),
563                ),
564        ),
565        Err(issue) => Either::Right(once(issue)),
566    }
567}
568
569// converts subgraph definitions to Subgraph<Upgraded> by assuming schema is valid and was already upgraded
570fn assume_subgraph_upgraded(
571    definition: SubgraphDefinition,
572) -> Result<Subgraph<Upgraded>, SubgraphError> {
573    Subgraph::parse(
574        definition.name.as_str(),
575        definition.url.as_str(),
576        definition.sdl.as_str(),
577    )
578    .and_then(|s| s.assume_expanded())
579    .map(|s| s.assume_upgraded())
580}
581
582// converts subgraph definitions to Subgraph<Validated> by assuming schema is valid and was already validated
583fn assume_subgraph_validated(
584    definition: SubgraphDefinition,
585) -> Result<Subgraph<Validated>, SubgraphError> {
586    assume_subgraph_upgraded(definition).and_then(|s| s.assume_validated())
587}