Skip to main content

apollo_composition/
lib.rs

1use apollo_compiler::{schema::ExtendedType, Schema};
2use apollo_federation::composition::{
3    expand_subgraphs, merge_subgraphs, post_merge_validations, pre_merge_validations,
4    upgrade_subgraphs_if_necessary, validate_satisfiability, CompositionFailure, Merged,
5    Supergraph,
6};
7use apollo_federation::connectors::{
8    expand::{expand_connectors, Connectors, ExpansionResult},
9    validation::{validate, Severity as ValidationSeverity, ValidationResult},
10    Connector,
11};
12use apollo_federation::error::CompositionError;
13use apollo_federation::internal_composition_api::validate_cache_tag_directives;
14use apollo_federation::subgraph::typestate::{Initial, Subgraph, Validated};
15use apollo_federation::subgraph::SubgraphError;
16use apollo_federation_types::build_plugin::PluginResult;
17use apollo_federation_types::composition::{MergeResult, SubgraphLocation};
18use apollo_federation_types::{
19    composition::{Issue, Severity},
20    javascript::SubgraphDefinition,
21};
22use std::collections::HashMap;
23use std::iter::once;
24use std::sync::Arc;
25
26/// This trait includes all the Rust-side composition logic, plus hooks for the JavaScript side.
27/// If you implement the functions in this trait to build your own JavaScript interface, then you
28/// can call [`HybridComposition::compose`] to run the complete composition process.
29///
30/// JavaScript should be implemented using `@apollo/composition@2.9.0-connectors.0`.
31#[allow(async_fn_in_trait)]
32pub trait HybridComposition {
33    /// Call the JavaScript `composeServices` function from `@apollo/composition` plus whatever
34    /// extra logic you need. Make sure to disable satisfiability, like `composeServices(definitions, {runSatisfiability: false})`
35    async fn compose_services_without_satisfiability(
36        &mut self,
37        subgraph_definitions: Vec<SubgraphDefinition>,
38    ) -> Option<SupergraphSdl<'_>>;
39
40    /// Call the JavaScript `validateSatisfiability` function from `@apollo/composition` plus whatever
41    /// extra logic you need.
42    ///
43    /// # Input
44    ///
45    /// The `validateSatisfiability` function wants an argument like `{ supergraphSdl }`. That field
46    /// should be the value that's updated when [`update_supergraph_sdl`] is called.
47    ///
48    /// # Output
49    ///
50    /// If satisfiability completes from JavaScript, either a list of hints (could be empty, the Ok case) or a list
51    /// of errors (never empty, the Err case) will be returned. If Satisfiability _can't_ be run, you can return a single error
52    /// (`Err(vec![Issue])`) indicating what went wrong.
53    async fn validate_satisfiability(&mut self) -> Result<Vec<Issue>, Vec<Issue>>;
54
55    /// Allows the Rust composition code to modify the stored supergraph SDL
56    /// (for example, to expand connectors).
57    fn update_supergraph_sdl(&mut self, supergraph_sdl: String);
58
59    /// When the Rust composition/validation code finds issues, it will call this method to add
60    /// them to the list of issues that will be returned to the user.
61    ///
62    /// It's on the implementor of this trait to convert `From<Issue>`
63    fn add_issues<Source: Iterator<Item = Issue>>(&mut self, issues: Source);
64
65    /// Runs the complete composition process, hooking into both the Rust and JavaScript implementations.
66    ///
67    /// # Asyncness
68    ///
69    /// While this function is async to allow for flexible JavaScript execution, it is a CPU-heavy task.
70    /// Take care when consuming this in an async context, as it may block longer than desired.
71    ///
72    /// # Algorithm
73    ///
74    /// 1. Run Rust-based validation on the subgraphs
75    /// 2. Call [`compose_services_without_satisfiability`] to run JavaScript-based composition
76    /// 3. Run Rust-based validation on the supergraph
77    /// 4. Call [`validate_satisfiability`] to run JavaScript-based validation on the supergraph
78    async fn compose(&mut self, subgraph_definitions: Vec<SubgraphDefinition>) {
79        // `@cacheTag` directive validation
80        if let Err(cache_tag_errors) = validate_cache_tag_in_subgraphs(&subgraph_definitions) {
81            self.add_issues(cache_tag_errors.into_iter());
82            return;
83        }
84
85        // connectors subgraph validations
86        let ConnectorsValidationResult {
87            subgraphs,
88            parsed_subgraphs,
89            hints: connector_hints,
90        } = match validate_connector_subgraphs(subgraph_definitions) {
91            Ok(results) => results,
92            Err(errors) => {
93                self.add_issues(errors.into_iter());
94                return;
95            }
96        };
97        self.add_issues(connector_hints.into_iter());
98
99        let Some(supergraph_sdl) = self
100            .compose_services_without_satisfiability(subgraphs)
101            .await
102        else {
103            return;
104        };
105
106        // Any issues with overrides are fatal since they'll cause errors in expansion,
107        // so we return early if we see any.
108        let override_errors = validate_overrides(parsed_subgraphs);
109        if !override_errors.is_empty() {
110            self.add_issues(override_errors.into_iter());
111            return;
112        }
113
114        let expansion_result = match expand_connectors(supergraph_sdl, &Default::default()) {
115            Ok(result) => result,
116            Err(err) => {
117                self.add_issues(once(Issue {
118                    code: "INTERNAL_ERROR".to_string(),
119                    message: format!(
120                        "Composition failed due to an internal error when expanding connectors, please report this: {err}"
121                    ),
122                    locations: vec![],
123                    severity: Severity::Error,
124                }));
125                return;
126            }
127        };
128        match expansion_result {
129            ExpansionResult::Expanded {
130                raw_sdl,
131                connectors: Connectors {
132                    by_service_name, ..
133                },
134                ..
135            } => {
136                let original_supergraph_sdl = supergraph_sdl.to_string();
137                self.update_supergraph_sdl(raw_sdl);
138                let satisfiability_result = self.validate_satisfiability().await;
139                self.add_issues(
140                    satisfiability_result_into_issues(satisfiability_result).map(|mut issue| {
141                        sanitize_connectors_issue(&mut issue, by_service_name.iter());
142                        issue
143                    }),
144                );
145
146                self.update_supergraph_sdl(original_supergraph_sdl);
147            }
148            ExpansionResult::Unchanged => {
149                let satisfiability_result = self.validate_satisfiability().await;
150                self.add_issues(satisfiability_result_into_issues(satisfiability_result));
151            }
152        }
153    }
154
155    /// <div class="warning">*** EXPERIMENTAL ***</div>
156    ///
157    /// Runs the composition process with granular composition phases that allow replacing individual
158    /// steps with Rust and/or JavaScript implementations.
159    ///
160    /// 1. <connectors> subgraph validation
161    /// 2. Initialize subgraphs - parses SDL into a GraphQL schema
162    /// 3. Expands subgraphs - adds all missing federation definitions
163    /// 4. Upgrade subgraphs - upgrades fed v1 schemas to fed v2
164    /// 5. Validate subgraphs
165    /// 6. Pre-merge validations (includes connectors validations)
166    /// 7. Merge subgraphs into a supergrpah
167    /// 8. Post merge validations
168    /// 9. <connectors> expand supergraph
169    /// 10. Validate satisfiability
170    ///
171    /// In case of a composition failure, we return a list of errors from the current composition
172    /// phase.
173    async fn experimental_compose(
174        mut self,
175        subgraph_definitions: Vec<SubgraphDefinition>,
176    ) -> Result<PluginResult, Vec<Issue>>
177    where
178        Self: Sized,
179    {
180        // connectors validations
181        // Any issues with overrides are fatal since they'll cause errors in expansion,
182        // so we return early if we see any.
183        // TODO those validations should be moved to subgraph validations in the apollo-federation crate instead
184        let ConnectorsValidationResult {
185            subgraphs: connected_subgraphs,
186            parsed_subgraphs,
187            hints: connector_hints,
188        } = validate_connector_subgraphs(subgraph_definitions)?;
189
190        let merged_supergraph =
191            Self::experimental_compose_without_satisfiability(connected_subgraphs)
192                .map_err(failure_to_issues)?;
193
194        // Extra connectors validation after merging.
195        // - So that connectors-related override errors will only be reported if merging was
196        //   successful.
197        let override_errors = validate_overrides(parsed_subgraphs);
198        if !override_errors.is_empty() {
199            return Err(override_errors);
200        }
201
202        // expand connectors as needed
203        let supergraph_sdl = merged_supergraph.schema().schema().to_string();
204        let merge_hints: Vec<Issue> = merged_supergraph
205            .hints()
206            .iter()
207            .map(|hint| hint.clone().into())
208            .collect();
209        let expansion_result = match expand_connectors(&supergraph_sdl, &Default::default()) {
210            Ok(result) => result,
211            Err(err) => {
212                return Err(vec![err.into()]);
213            }
214        };
215
216        // verify satisfiability
217        match expansion_result {
218            ExpansionResult::Expanded {
219                raw_sdl,
220                connectors: Connectors {
221                    by_service_name, ..
222                },
223                ..
224            } => {
225                match self
226                    .experimental_validate_satisfiability(raw_sdl.as_str())
227                    .await
228                {
229                    Ok(sat_hints) => {
230                        let mut composition_hints = merge_hints;
231                        composition_hints.extend(sat_hints);
232
233                        let mut build_messages: Vec<_> =
234                            connector_hints.into_iter().map(|h| h.into()).collect();
235                        build_messages.extend(composition_hints.into_iter().map(|h| {
236                            let mut issue = Into::<Issue>::into(h);
237                            sanitize_connectors_issue(&mut issue, by_service_name.iter());
238                            issue.into()
239                        }));
240                        Ok(PluginResult::new(Ok(supergraph_sdl), build_messages))
241                    }
242                    Err(mut issues) => {
243                        issues.extend(merge_hints);
244                        Err(issues
245                            .into_iter()
246                            .map(|mut issue| {
247                                sanitize_connectors_issue(&mut issue, by_service_name.iter());
248                                issue
249                            })
250                            .collect())
251                    }
252                }
253            }
254            ExpansionResult::Unchanged => {
255                match self
256                    .experimental_validate_satisfiability(supergraph_sdl.as_str())
257                    .await
258                {
259                    Ok(sat_hints) => {
260                        let mut hints = merge_hints;
261                        hints.extend(sat_hints);
262
263                        let build_messages: Vec<_> = hints
264                            .into_iter()
265                            .map(|h| Into::<Issue>::into(h).into())
266                            .collect();
267                        Ok(PluginResult::new(Ok(supergraph_sdl), build_messages))
268                    }
269                    Err(mut issues) => {
270                        issues.extend(merge_hints);
271                        Err(issues)
272                    }
273                }
274            }
275        }
276    }
277
278    /// Maps to buildSubgraph & upgradeSubgraphsIfNecessary and performs following steps
279    ///
280    /// 1. Parses raw SDL schemas into Subgraph<Initial>
281    /// 2. Adds missing federation definitions to the subgraph schemas
282    /// 3. Upgrades federation v1 subgraphs to federation v2 schemas.
283    ///    This is a no-op if it is already a federation v2 subgraph.
284    /// 4. Validates the expanded/upgraded subgraph schemas.
285    async fn experimental_upgrade_subgraphs(
286        &mut self,
287        subgraphs: Vec<SubgraphDefinition>,
288    ) -> Result<Vec<SubgraphDefinition>, Vec<Issue>> {
289        let mut issues: Vec<Issue> = vec![];
290        let initial: Vec<Subgraph<Initial>> = subgraphs
291            .into_iter()
292            .map(|s| s.try_into())
293            .filter_map(|r| {
294                r.map_err(|e: SubgraphError| issues.extend(convert_subgraph_error_to_issues(e)))
295                    .ok()
296            })
297            .collect();
298        if !issues.is_empty() {
299            return Err(issues);
300        }
301        expand_subgraphs(initial)
302            .and_then(|subgraphs| {
303                upgrade_subgraphs_if_necessary(subgraphs).map_err(CompositionFailure::from_errors)
304            })
305            .map(|subgraphs| subgraphs.into_iter().map(|s| s.into()).collect())
306            .map_err(failure_to_issues)
307    }
308
309    /// In case of a merge failure, returns a list of errors.
310    async fn experimental_merge_subgraphs(
311        &mut self,
312        subgraphs: Vec<SubgraphDefinition>,
313    ) -> Result<MergeResult, Vec<Issue>> {
314        let mut subgraph_errors = vec![];
315        let validated: Vec<Subgraph<Validated>> = subgraphs
316            .into_iter()
317            .map(assume_subgraph_validated)
318            .filter_map(|r| {
319                r.map_err(|e| subgraph_errors.extend(convert_subgraph_error_to_issues(e)))
320                    .ok()
321            })
322            .collect();
323        if !subgraph_errors.is_empty() {
324            // this should never happen
325            return Err(subgraph_errors);
326        }
327        pre_merge_validations(&validated).map_err(failure_to_issues)?;
328        let supergraph =
329            merge_subgraphs(validated, &Default::default()).map_err(failure_to_issues)?;
330        post_merge_validations(&supergraph).map_err(failure_to_issues)?;
331        let hints = supergraph
332            .hints()
333            .iter()
334            .map(|hint| hint.clone().into())
335            .collect();
336        Ok(MergeResult {
337            supergraph: supergraph.schema().schema().to_string(),
338            hints,
339        })
340    }
341
342    /// If successful, returns a list of hints (possibly empty); Otherwise, returns a list of
343    /// errors and hints (as WARN-level issues).
344    async fn experimental_validate_satisfiability(
345        &mut self,
346        supergraph_sdl: &str,
347    ) -> Result<Vec<Issue>, Vec<Issue>> {
348        let supergraph = Supergraph::parse(supergraph_sdl).map_err(|e| vec![Issue::from(e)])?;
349        validate_satisfiability(supergraph, &Default::default())
350            .map(|s| s.hints().iter().map(|h| h.clone().into()).collect())
351            .map_err(failure_to_issues)
352    }
353
354    fn experimental_compose_without_satisfiability(
355        subgraphs: Vec<SubgraphDefinition>,
356    ) -> Result<Supergraph<Merged>, CompositionFailure> {
357        let mut errors: Vec<CompositionError> = vec![];
358        let initial: Vec<Subgraph<Initial>> = subgraphs
359            .into_iter()
360            .map(|s| s.try_into())
361            .filter_map(|r| {
362                r.map_err(|e: SubgraphError| errors.extend(e.to_composition_errors()))
363                    .ok()
364            })
365            .collect();
366        if !errors.is_empty() {
367            return Err(CompositionFailure::from_errors(errors));
368        }
369
370        let expanded_subgraphs = expand_subgraphs(initial)?;
371        let validated_subgraphs = upgrade_subgraphs_if_necessary(expanded_subgraphs)
372            .map_err(CompositionFailure::from_errors)?;
373        pre_merge_validations(&validated_subgraphs)?;
374        let supergraph = merge_subgraphs(validated_subgraphs, &Default::default())?;
375        post_merge_validations(&supergraph)?;
376        Ok(supergraph)
377    }
378}
379
380struct SubgraphSchema {
381    schema: Schema,
382    has_connectors: bool,
383}
384
385struct ConnectorsValidationResult {
386    subgraphs: Vec<SubgraphDefinition>,
387    parsed_subgraphs: HashMap<String, SubgraphSchema>,
388    hints: Vec<Issue>,
389}
390// TODO this should eventually move under expand/validate subgraph logic
391fn validate_connector_subgraphs(
392    subgraph_definitions: Vec<SubgraphDefinition>,
393) -> Result<ConnectorsValidationResult, Vec<Issue>> {
394    let mut subgraph_validation_errors = Vec::new();
395    let mut subgraph_validation_hints = Vec::new();
396    let mut parsed_schemas = HashMap::new();
397    let subgraph_definitions = subgraph_definitions
398        .into_iter()
399        .map(|mut subgraph| {
400            let ValidationResult {
401                errors,
402                has_connectors,
403                schema,
404                transformed,
405            } = validate(subgraph.sdl, &subgraph.name);
406            subgraph.sdl = transformed;
407            for error in errors {
408                let issue = Issue {
409                    code: error.code.to_string(),
410                    message: error.message,
411                    locations: error
412                        .locations
413                        .into_iter()
414                        .map(|range| SubgraphLocation {
415                            subgraph: Some(subgraph.name.clone()),
416                            range: Some(range),
417                        })
418                        .collect(),
419                    severity: convert_severity(error.code.severity()),
420                };
421                if issue.severity == Severity::Error {
422                    subgraph_validation_errors.push(issue);
423                } else {
424                    subgraph_validation_hints.push(issue);
425                }
426            }
427            parsed_schemas.insert(
428                subgraph.name.clone(),
429                SubgraphSchema {
430                    schema,
431                    has_connectors,
432                },
433            );
434            subgraph
435        })
436        .collect();
437
438    if !subgraph_validation_errors.is_empty() {
439        return Err(subgraph_validation_errors);
440    }
441    Ok(ConnectorsValidationResult {
442        subgraphs: subgraph_definitions,
443        parsed_subgraphs: parsed_schemas,
444        hints: subgraph_validation_hints,
445    })
446}
447
448/// Validate overrides for connector-related subgraphs
449///
450/// Overrides mess with the supergraph in ways that can be difficult to detect when
451/// expanding connectors; the supergraph may omit overridden fields and other shenanigans.
452/// To allow for a better developer experience, we check here if any connector-enabled subgraphs
453/// have fields overridden.
454fn validate_overrides(schemas: HashMap<String, SubgraphSchema>) -> Vec<Issue> {
455    let mut override_errors = Vec::new();
456    for (subgraph_name, SubgraphSchema { schema, .. }) in &schemas {
457        // We need to grab all fields in the schema since only fields can have the @override
458        // directive attached
459        macro_rules! extract_directives {
460            ($node:ident) => {
461                $node
462                    .fields
463                    .iter()
464                    .flat_map(|(name, field)| {
465                        field
466                            .directives
467                            .iter()
468                            .map(move |d| (format!("{}.{}", $node.name, name), d))
469                    })
470                    .collect::<Vec<_>>()
471            };
472        }
473
474        let override_directives = schema
475            .types
476            .values()
477            .flat_map(|v| match v {
478                ExtendedType::Object(node) => extract_directives!(node),
479                ExtendedType::Interface(node) => extract_directives!(node),
480                ExtendedType::InputObject(node) => extract_directives!(node),
481
482                // These types do not have fields
483                ExtendedType::Scalar(_) | ExtendedType::Union(_) | ExtendedType::Enum(_) => {
484                    Vec::new()
485                }
486            })
487            .filter(|(_, directive)| {
488                // TODO: The directive name for @override could have been aliased
489                // at the SDL level, so we'll need to extract the aliased name here instead
490                directive.name == "override" || directive.name == "federation__override"
491            });
492
493        // Now see if we have any overrides that try to reference connector subgraphs
494        for (field, directive) in override_directives {
495            // If the override directive does not have a valid `from` field, then there is
496            // no point trying to validate it, as later steps will validate the entire schema.
497            let Ok(Some(overridden_subgraph_name)) = directive
498                .argument_by_name("from", schema)
499                .map(|node| node.as_str())
500            else {
501                continue;
502            };
503
504            if schemas
505                .get(overridden_subgraph_name)
506                .is_some_and(|schema| schema.has_connectors)
507            {
508                override_errors.push(Issue {
509                        code: "OVERRIDE_ON_CONNECTOR".to_string(),
510                        message: format!(
511                            r#"Field "{field}" on subgraph "{subgraph_name}" is trying to override connector-enabled subgraph "{overridden_subgraph_name}", which is not yet supported. See https://go.apollo.dev/connectors/limitations#override-is-partially-unsupported"#,
512                        ),
513                        locations: vec![SubgraphLocation {
514                            subgraph: Some(String::from(overridden_subgraph_name)),
515                            range: directive.line_column_range(&schema.sources),
516                        }],
517                        severity: Severity::Error,
518                    });
519            }
520        }
521    }
522
523    override_errors
524}
525
526fn sanitize_connectors_issue<'a>(
527    issue: &mut Issue,
528    connector_subgraphs: impl Iterator<Item = (&'a Arc<str>, &'a Connector)>,
529) {
530    for (service_name, connector) in connector_subgraphs {
531        issue.message = issue
532            .message
533            .replace(&**service_name, connector.id.subgraph_name.as_str());
534    }
535}
536
537fn validate_cache_tag_in_subgraphs(
538    subgraph_definitions: &[SubgraphDefinition],
539) -> Result<(), Vec<Issue>> {
540    let mut issues = Vec::new();
541    for subgraph_def in subgraph_definitions {
542        match validate_cache_tag_directives(
543            &subgraph_def.name,
544            &subgraph_def.url,
545            &subgraph_def.sdl,
546        ) {
547            Err(_err) => {
548                // Ignore internal errors as they must be GraphQL/Federation validation errors,
549                // which will be reported during the main validation.
550                break;
551            }
552            Ok(res) => {
553                if !res.errors.is_empty() {
554                    issues.extend(res.errors.into_iter().map(|err| {
555                        Issue {
556                            code: err.code().to_string(),
557                            message: err.message().to_string(),
558                            locations: err
559                                .locations()
560                                .iter()
561                                .cloned()
562                                .map(|range| SubgraphLocation {
563                                    subgraph: Some(subgraph_def.name.clone()),
564                                    range: Some(range),
565                                })
566                                .collect(),
567                            severity: Severity::Error,
568                        }
569                    }));
570                }
571            }
572        }
573    }
574    if !issues.is_empty() {
575        Err(issues)
576    } else {
577        Ok(())
578    }
579}
580
581pub type SupergraphSdl<'a> = &'a str;
582
583/// A successfully composed supergraph, optionally with some issues that should be addressed.
584#[derive(Clone, Debug)]
585pub struct PartialSuccess {
586    pub supergraph_sdl: String,
587    pub issues: Vec<Issue>,
588}
589
590fn convert_severity(severity: ValidationSeverity) -> Severity {
591    match severity {
592        ValidationSeverity::Error => Severity::Error,
593        ValidationSeverity::Warning => Severity::Warning,
594    }
595}
596
597fn satisfiability_result_into_issues(
598    result: Result<Vec<Issue>, Vec<Issue>>,
599) -> impl Iterator<Item = Issue> {
600    match result {
601        Ok(hints) => hints.into_iter(),
602        Err(errors) => errors.into_iter(),
603    }
604}
605
606fn failure_to_issues(failure: CompositionFailure) -> Vec<Issue> {
607    let mut issues: Vec<Issue> = failure.errors.into_iter().map(Issue::from).collect();
608    issues.extend(failure.hints.into_iter().map(Issue::from));
609    issues
610}
611
612// converts subgraph definitions to Subgraph<Validated> by assuming schema is already
613// expanded/upgraded/validated
614fn assume_subgraph_validated(
615    definition: SubgraphDefinition,
616) -> Result<Subgraph<Validated>, SubgraphError> {
617    Subgraph::parse(
618        definition.name.as_str(),
619        definition.url.as_str(),
620        definition.sdl.as_str(),
621    )
622    .and_then(|s| s.assume_expanded())
623    .map(|s| s.assume_validated())
624}
625
626fn convert_subgraph_error_to_issues(error: SubgraphError) -> Vec<Issue> {
627    error
628        .to_composition_errors()
629        .map(|err| err.into())
630        .collect()
631}