apollo_composition/
lib.rs

1use apollo_compiler::{schema::ExtendedType, Schema};
2use apollo_federation::composition::{
3    expand_subgraphs, merge_subgraphs, post_merge_validations, pre_merge_validations,
4    upgrade_subgraphs_if_necessary, validate_satisfiability, validate_subgraphs, Supergraph,
5};
6use apollo_federation::connectors::{
7    expand::{expand_connectors, Connectors, ExpansionResult},
8    validation::{validate, Severity as ValidationSeverity, ValidationResult},
9    Connector,
10};
11use apollo_federation::subgraph::typestate::{Initial, Subgraph, Upgraded, Validated};
12use apollo_federation::subgraph::SubgraphError;
13use apollo_federation_types::build_plugin::{BuildMessage, PluginResult};
14use apollo_federation_types::composition::{convert_subraph_error_to_issues, SubgraphLocation};
15use apollo_federation_types::javascript::{CompositionHint, HintCodeDefinition, MergeResult};
16use apollo_federation_types::{
17    composition::{Issue, Severity},
18    javascript::{SatisfiabilityResult, SubgraphDefinition},
19};
20use either::Either;
21use std::collections::HashMap;
22use std::iter::once;
23use std::sync::Arc;
24
25/// This trait includes all the Rust-side composition logic, plus hooks for the JavaScript side.
26/// If you implement the functions in this trait to build your own JavaScript interface, then you
27/// can call [`HybridComposition::compose`] to run the complete composition process.
28///
29/// JavaScript should be implemented using `@apollo/composition@2.9.0-connectors.0`.
30#[allow(async_fn_in_trait)]
31pub trait HybridComposition {
32    /// Call the JavaScript `composeServices` function from `@apollo/composition` plus whatever
33    /// extra logic you need. Make sure to disable satisfiability, like `composeServices(definitions, {runSatisfiability: false})`
34    async fn compose_services_without_satisfiability(
35        &mut self,
36        subgraph_definitions: Vec<SubgraphDefinition>,
37    ) -> Option<SupergraphSdl>;
38
39    /// Call the JavaScript `validateSatisfiability` function from `@apollo/composition` plus whatever
40    /// extra logic you need.
41    ///
42    /// # Input
43    ///
44    /// The `validateSatisfiability` function wants an argument like `{ supergraphSdl }`. That field
45    /// should be the value that's updated when [`update_supergraph_sdl`] is called.
46    ///
47    /// # Output
48    ///
49    /// If satisfiability completes from JavaScript, the [`SatisfiabilityResult`] (matching the shape
50    /// of that function) should be returned. If Satisfiability _can't_ be run, you can return an
51    /// `Err(Issue)` instead indicating what went wrong.
52    async fn validate_satisfiability(&mut self) -> Result<SatisfiabilityResult, Issue>;
53
54    /// Allows the Rust composition code to modify the stored supergraph SDL
55    /// (for example, to expand connectors).
56    fn update_supergraph_sdl(&mut self, supergraph_sdl: String);
57
58    /// When the Rust composition/validation code finds issues, it will call this method to add
59    /// them to the list of issues that will be returned to the user.
60    ///
61    /// It's on the implementor of this trait to convert `From<Issue>`
62    fn add_issues<Source: Iterator<Item = Issue>>(&mut self, issues: Source);
63
64    /// Runs the complete composition process, hooking into both the Rust and JavaScript implementations.
65    ///
66    /// # Asyncness
67    ///
68    /// While this function is async to allow for flexible JavaScript execution, it is a CPU-heavy task.
69    /// Take care when consuming this in an async context, as it may block longer than desired.
70    ///
71    /// # Algorithm
72    ///
73    /// 1. Run Rust-based validation on the subgraphs
74    /// 2. Call [`compose_services_without_satisfiability`] to run JavaScript-based composition
75    /// 3. Run Rust-based validation on the supergraph
76    /// 4. Call [`validate_satisfiability`] to run JavaScript-based validation on the supergraph
77    async fn compose(&mut self, subgraph_definitions: Vec<SubgraphDefinition>) {
78        // connectors subgraph validations
79        let ConnectorsValidationResult {
80            subgraphs,
81            parsed_subgraphs,
82            hints: connector_hints,
83        } = match validate_connector_subgraphs(subgraph_definitions) {
84            Ok(results) => results,
85            Err(errors) => {
86                self.add_issues(errors.into_iter());
87                return;
88            }
89        };
90        self.add_issues(connector_hints.into_iter());
91
92        let Some(supergraph_sdl) = self
93            .compose_services_without_satisfiability(subgraphs)
94            .await
95        else {
96            return;
97        };
98
99        // Any issues with overrides are fatal since they'll cause errors in expansion,
100        // so we return early if we see any.
101        let override_errors = validate_overrides(parsed_subgraphs);
102        if !override_errors.is_empty() {
103            self.add_issues(override_errors.into_iter());
104            return;
105        }
106
107        let expansion_result = match expand_connectors(supergraph_sdl, &Default::default()) {
108            Ok(result) => result,
109            Err(err) => {
110                self.add_issues(once(Issue {
111                    code: "INTERNAL_ERROR".to_string(),
112                    message: format!(
113                        "Composition failed due to an internal error, please report this: {err}"
114                    ),
115                    locations: vec![],
116                    severity: Severity::Error,
117                }));
118                return;
119            }
120        };
121        match expansion_result {
122            ExpansionResult::Expanded {
123                raw_sdl,
124                connectors: Connectors {
125                    by_service_name, ..
126                },
127                ..
128            } => {
129                let original_supergraph_sdl = supergraph_sdl.to_string();
130                self.update_supergraph_sdl(raw_sdl);
131                let satisfiability_result = self.validate_satisfiability().await;
132                self.add_issues(
133                    satisfiability_result_into_issues(satisfiability_result).map(|mut issue| {
134                        sanitize_connectors_issue(&mut issue, by_service_name.iter());
135                        issue
136                    }),
137                );
138
139                self.update_supergraph_sdl(original_supergraph_sdl);
140            }
141            ExpansionResult::Unchanged => {
142                let satisfiability_result = self.validate_satisfiability().await;
143                self.add_issues(satisfiability_result_into_issues(satisfiability_result));
144            }
145        }
146    }
147
148    /// <div class="warning">*** EXPERIMENTAL ***</div>
149    ///
150    /// Runs the composition process with granular composition phases that allow replacing individual
151    /// steps with Rust and/or JavaScript implementations.
152    ///
153    /// 1. <connectors> subgraph validation
154    /// 2. Initialize subgraphs - parses SDL into a GraphQL schema
155    /// 3. Expands subgraphs - adds all missing federation definitions
156    /// 4. Upgrade subgraphs - upgrades fed v1 schemas to fed v2
157    /// 5. Validate subgraphs
158    /// 6. Pre-merge validations (includes connectors validations)
159    /// 7. Merge subgraphs into a supergrpah
160    /// 8. Post merge validations
161    /// 9. <connectors> expand supergraph
162    /// 10. Validate satisfiability
163    ///
164    /// In case of a composition failure, we return a list of errors from the current composition
165    /// phase.
166    async fn experimental_compose(
167        mut self,
168        subgraph_definitions: Vec<SubgraphDefinition>,
169    ) -> Result<PluginResult, Vec<Issue>>
170    where
171        Self: Sized,
172    {
173        let upgraded_subgraphs = self
174            .experimental_upgrade_subgraphs(subgraph_definitions)
175            .await?;
176        let validated_subgraphs = self
177            .experimental_validate_subgraphs(upgraded_subgraphs)
178            .await?;
179
180        // connectors validations
181        // Any issues with overrides are fatal since they'll cause errors in expansion,
182        // so we return early if we see any.
183        let ConnectorsValidationResult {
184            subgraphs: connected_subgraphs,
185            parsed_subgraphs,
186            hints: connector_hints,
187        } = validate_connector_subgraphs(validated_subgraphs)?;
188        let override_errors = validate_overrides(parsed_subgraphs);
189        if !override_errors.is_empty() {
190            return Err(override_errors);
191        }
192
193        // merge
194        let merge_result = self
195            .experimental_merge_subgraphs(connected_subgraphs)
196            .await?;
197
198        // expand connectors as needed
199        let supergraph_sdl = merge_result.supergraph.clone();
200        let expansion_result = match expand_connectors(&supergraph_sdl, &Default::default()) {
201            Ok(result) => result,
202            Err(err) => {
203                return Err(vec![err.into()]);
204            }
205        };
206
207        // verify satisfiability
208        match expansion_result {
209            ExpansionResult::Expanded {
210                raw_sdl,
211                connectors: Connectors {
212                    by_service_name, ..
213                },
214                ..
215            } => {
216                self.experimental_validate_satisfiability(raw_sdl.as_str())
217                    .await
218                    .map(|s| {
219                        let mut composition_hints = merge_result.hints;
220                        composition_hints.extend(s);
221
222                        let mut build_messages: Vec<BuildMessage> =
223                            connector_hints.into_iter().map(|h| h.into()).collect();
224                        build_messages.extend(composition_hints.into_iter().map(|h| {
225                            let mut issue = Into::<Issue>::into(h);
226                            sanitize_connectors_issue(&mut issue, by_service_name.iter());
227                            issue.into()
228                        }));
229                        // return original supergraph
230                        PluginResult::new(Ok(supergraph_sdl), build_messages)
231                    })
232                    .map_err(|err| {
233                        err.into_iter()
234                            .map(|mut issue| {
235                                sanitize_connectors_issue(&mut issue, by_service_name.iter());
236                                issue
237                            })
238                            .collect()
239                    })
240            }
241            ExpansionResult::Unchanged => self
242                .experimental_validate_satisfiability(supergraph_sdl.as_str())
243                .await
244                .map(|s| {
245                    let mut hints = merge_result.hints;
246                    hints.extend(s);
247
248                    let build_messages: Vec<BuildMessage> = hints
249                        .into_iter()
250                        .map(|h| Into::<Issue>::into(h).into())
251                        .collect();
252                    PluginResult::new(Ok(supergraph_sdl), build_messages)
253                }),
254        }
255    }
256
257    /// Maps to upgradeSubgraphsIfNecessary and performs following steps
258    ///
259    /// 1. Parses raw SDL schemas into Subgraph<Initial>
260    /// 2. Adds missing federation definitions to the subgraph schemas
261    /// 3. Upgrades federation v1 subgraphs to federation v2 schemas.
262    ///    This is a no-op if it is already a federation v2 subgraph.
263    async fn experimental_upgrade_subgraphs(
264        &mut self,
265        subgraphs: Vec<SubgraphDefinition>,
266    ) -> Result<Vec<SubgraphDefinition>, Vec<Issue>> {
267        let mut issues: Vec<Issue> = vec![];
268        let initial: Vec<Subgraph<Initial>> = subgraphs
269            .into_iter()
270            .map(|s| s.try_into())
271            .filter_map(|r| {
272                r.map_err(|e: SubgraphError| issues.extend(convert_subraph_error_to_issues(e)))
273                    .ok()
274            })
275            .collect();
276        if !issues.is_empty() {
277            return Err(issues);
278        }
279        expand_subgraphs(initial)
280            .and_then(upgrade_subgraphs_if_necessary)
281            .map(|subgraphs| subgraphs.into_iter().map(|s| s.into()).collect())
282            .map_err(|errors| errors.into_iter().map(Issue::from).collect::<Vec<_>>())
283    }
284
285    /// Performs all subgraph validations.
286    async fn experimental_validate_subgraphs(
287        &mut self,
288        subgraphs: Vec<SubgraphDefinition>,
289    ) -> Result<Vec<SubgraphDefinition>, Vec<Issue>> {
290        let mut issues = vec![];
291        let upgraded: Vec<Subgraph<Upgraded>> = subgraphs
292            .into_iter()
293            .map(assume_subgraph_upgraded)
294            .filter_map(|r| {
295                r.map_err(|e| issues.extend(convert_subraph_error_to_issues(e)))
296                    .ok()
297            })
298            .collect();
299        if !issues.is_empty() {
300            // this should never happen
301            return Err(issues);
302        }
303        validate_subgraphs(upgraded)
304            .map(|subgraphs| subgraphs.into_iter().map(|s| s.into()).collect())
305            .map_err(|errors| errors.into_iter().map(Issue::from).collect::<Vec<_>>())
306    }
307
308    async fn experimental_merge_subgraphs(
309        &mut self,
310        subgraphs: Vec<SubgraphDefinition>,
311    ) -> Result<MergeResult, Vec<Issue>> {
312        let mut subgraph_errors = vec![];
313        let validated: Vec<Subgraph<Validated>> = subgraphs
314            .into_iter()
315            .map(assume_subgraph_validated)
316            .filter_map(|r| {
317                r.map_err(|e| subgraph_errors.extend(convert_subraph_error_to_issues(e)))
318                    .ok()
319            })
320            .collect();
321        if !subgraph_errors.is_empty() {
322            // this should never happen
323            return Err(subgraph_errors);
324        }
325        pre_merge_validations(&validated)
326            .map_err(|errors| errors.into_iter().map(Issue::from).collect::<Vec<_>>())?;
327        let supergraph = merge_subgraphs(validated)
328            .map_err(|errors| errors.into_iter().map(Issue::from).collect::<Vec<_>>())?;
329        post_merge_validations(&supergraph)
330            .map_err(|errors| errors.into_iter().map(Issue::from).collect::<Vec<_>>())?;
331        let hints = supergraph
332            .hints()
333            .iter()
334            .map(|h| CompositionHint {
335                message: h.message.clone(),
336                definition: HintCodeDefinition {
337                    code: h.code.clone(),
338                },
339                nodes: None,
340            })
341            .collect();
342        Ok(MergeResult {
343            supergraph: supergraph.schema().to_string(),
344            hints,
345        })
346    }
347
348    async fn experimental_validate_satisfiability(
349        &mut self,
350        supergraph_sdl: &str,
351    ) -> Result<Vec<CompositionHint>, Vec<Issue>> {
352        let supergraph = Supergraph::parse(supergraph_sdl).map_err(|e| vec![Issue::from(e)])?;
353        validate_satisfiability(supergraph)
354            .map(|s| {
355                s.hints()
356                    .iter()
357                    .map(|h| CompositionHint {
358                        message: h.message.clone(),
359                        definition: HintCodeDefinition {
360                            code: h.code.clone(),
361                        },
362                        nodes: None,
363                    })
364                    .collect()
365            })
366            .map_err(|errors| errors.into_iter().map(Issue::from).collect::<Vec<_>>())
367    }
368}
369
370struct SubgraphSchema {
371    schema: Schema,
372    has_connectors: bool,
373}
374
375struct ConnectorsValidationResult {
376    subgraphs: Vec<SubgraphDefinition>,
377    parsed_subgraphs: HashMap<String, SubgraphSchema>,
378    hints: Vec<Issue>,
379}
380// TODO this should eventually move under expand/validate subgraph logic
381fn validate_connector_subgraphs(
382    subgraph_definitions: Vec<SubgraphDefinition>,
383) -> Result<ConnectorsValidationResult, Vec<Issue>> {
384    let mut subgraph_validation_errors = Vec::new();
385    let mut subgraph_validation_hints = Vec::new();
386    let mut parsed_schemas = HashMap::new();
387    let subgraph_definitions = subgraph_definitions
388        .into_iter()
389        .map(|mut subgraph| {
390            let ValidationResult {
391                errors,
392                has_connectors,
393                schema,
394                transformed,
395            } = validate(subgraph.sdl, &subgraph.name);
396            subgraph.sdl = transformed;
397            for error in errors {
398                let issue = Issue {
399                    code: error.code.to_string(),
400                    message: error.message,
401                    locations: error
402                        .locations
403                        .into_iter()
404                        .map(|range| SubgraphLocation {
405                            subgraph: Some(subgraph.name.clone()),
406                            range: Some(range),
407                        })
408                        .collect(),
409                    severity: convert_severity(error.code.severity()),
410                };
411                if issue.severity == Severity::Error {
412                    subgraph_validation_errors.push(issue);
413                } else {
414                    subgraph_validation_hints.push(issue);
415                }
416            }
417            parsed_schemas.insert(
418                subgraph.name.clone(),
419                SubgraphSchema {
420                    schema,
421                    has_connectors,
422                },
423            );
424            subgraph
425        })
426        .collect();
427
428    if !subgraph_validation_errors.is_empty() {
429        return Err(subgraph_validation_errors);
430    }
431    Ok(ConnectorsValidationResult {
432        subgraphs: subgraph_definitions,
433        parsed_subgraphs: parsed_schemas,
434        hints: subgraph_validation_hints,
435    })
436}
437
438/// Validate overrides for connector-related subgraphs
439///
440/// Overrides mess with the supergraph in ways that can be difficult to detect when
441/// expanding connectors; the supergraph may omit overridden fields and other shenanigans.
442/// To allow for a better developer experience, we check here if any connector-enabled subgraphs
443/// have fields overridden.
444fn validate_overrides(schemas: HashMap<String, SubgraphSchema>) -> Vec<Issue> {
445    let mut override_errors = Vec::new();
446    for (subgraph_name, SubgraphSchema { schema, .. }) in &schemas {
447        // We need to grab all fields in the schema since only fields can have the @override
448        // directive attached
449        macro_rules! extract_directives {
450            ($node:ident) => {
451                $node
452                    .fields
453                    .iter()
454                    .flat_map(|(name, field)| {
455                        field
456                            .directives
457                            .iter()
458                            .map(move |d| (format!("{}.{}", $node.name, name), d))
459                    })
460                    .collect::<Vec<_>>()
461            };
462        }
463
464        let override_directives = schema
465            .types
466            .values()
467            .flat_map(|v| match v {
468                ExtendedType::Object(node) => extract_directives!(node),
469                ExtendedType::Interface(node) => extract_directives!(node),
470                ExtendedType::InputObject(node) => extract_directives!(node),
471
472                // These types do not have fields
473                ExtendedType::Scalar(_) | ExtendedType::Union(_) | ExtendedType::Enum(_) => {
474                    Vec::new()
475                }
476            })
477            .filter(|(_, directive)| {
478                // TODO: The directive name for @override could have been aliased
479                // at the SDL level, so we'll need to extract the aliased name here instead
480                directive.name == "override" || directive.name == "federation__override"
481            });
482
483        // Now see if we have any overrides that try to reference connector subgraphs
484        for (field, directive) in override_directives {
485            // If the override directive does not have a valid `from` field, then there is
486            // no point trying to validate it, as later steps will validate the entire schema.
487            let Ok(Some(overridden_subgraph_name)) = directive
488                .argument_by_name("from", schema)
489                .map(|node| node.as_str())
490            else {
491                continue;
492            };
493
494            if schemas
495                .get(overridden_subgraph_name)
496                .is_some_and(|schema| schema.has_connectors)
497            {
498                override_errors.push(Issue {
499                        code: "OVERRIDE_ON_CONNECTOR".to_string(),
500                        message: format!(
501                            r#"Field "{field}" on subgraph "{subgraph_name}" is trying to override connector-enabled subgraph "{overridden_subgraph_name}", which is not yet supported. See https://go.apollo.dev/connectors/limitations#override-is-partially-unsupported"#,
502                        ),
503                        locations: vec![SubgraphLocation {
504                            subgraph: Some(String::from(overridden_subgraph_name)),
505                            range: directive.line_column_range(&schema.sources),
506                        }],
507                        severity: Severity::Error,
508                    });
509            }
510        }
511    }
512
513    override_errors
514}
515
516fn sanitize_connectors_issue<'a>(
517    issue: &mut Issue,
518    connector_subgraphs: impl Iterator<Item = (&'a Arc<str>, &'a Connector)>,
519) {
520    for (service_name, connector) in connector_subgraphs {
521        issue.message = issue
522            .message
523            .replace(&**service_name, connector.id.subgraph_name.as_str());
524    }
525}
526
527pub type SupergraphSdl<'a> = &'a str;
528
529/// A successfully composed supergraph, optionally with some issues that should be addressed.
530#[derive(Clone, Debug)]
531pub struct PartialSuccess {
532    pub supergraph_sdl: String,
533    pub issues: Vec<Issue>,
534}
535
536fn convert_severity(severity: ValidationSeverity) -> Severity {
537    match severity {
538        ValidationSeverity::Error => Severity::Error,
539        ValidationSeverity::Warning => Severity::Warning,
540    }
541}
542
543fn satisfiability_result_into_issues(
544    satisfiability_result: Result<SatisfiabilityResult, Issue>,
545) -> Either<impl Iterator<Item = Issue>, impl Iterator<Item = Issue>> {
546    match satisfiability_result {
547        Ok(satisfiability_result) => Either::Left(
548            satisfiability_result
549                .errors
550                .into_iter()
551                .flatten()
552                .map(Issue::from)
553                .chain(
554                    satisfiability_result
555                        .hints
556                        .into_iter()
557                        .flatten()
558                        .map(Issue::from),
559                ),
560        ),
561        Err(issue) => Either::Right(once(issue)),
562    }
563}
564
565// converts subgraph definitions to Subgraph<Upgraded> by assuming schema is valid and was already upgraded
566fn assume_subgraph_upgraded(
567    definition: SubgraphDefinition,
568) -> Result<Subgraph<Upgraded>, SubgraphError> {
569    Subgraph::parse(
570        definition.name.as_str(),
571        definition.url.as_str(),
572        definition.sdl.as_str(),
573    )
574    .and_then(|s| s.assume_expanded())
575    .map(|s| s.assume_upgraded())
576}
577
578// converts subgraph definitions to Subgraph<Validated> by assuming schema is valid and was already validated
579fn assume_subgraph_validated(
580    definition: SubgraphDefinition,
581) -> Result<Subgraph<Validated>, SubgraphError> {
582    assume_subgraph_upgraded(definition).and_then(|s| s.assume_validated())
583}