apollo_composition/
lib.rs

1use apollo_compiler::{schema::ExtendedType, Schema};
2use apollo_federation::composition::{
3    expand_subgraphs, merge_subgraphs, post_merge_validations, pre_merge_validations,
4    upgrade_subgraphs_if_necessary, validate_satisfiability, Supergraph,
5};
6use apollo_federation::connectors::{
7    expand::{expand_connectors, Connectors, ExpansionResult},
8    validation::{validate, Severity as ValidationSeverity, ValidationResult},
9    Connector,
10};
11use apollo_federation::internal_composition_api::validate_cache_tag_directives;
12use apollo_federation::subgraph::typestate::{Initial, Subgraph, Validated};
13use apollo_federation::subgraph::SubgraphError;
14use apollo_federation_types::build_plugin::PluginResult;
15use apollo_federation_types::composition::{MergeResult, SubgraphLocation};
16use apollo_federation_types::{
17    composition::{Issue, Severity},
18    javascript::SubgraphDefinition,
19};
20use std::collections::HashMap;
21use std::iter::once;
22use std::sync::Arc;
23
24/// This trait includes all the Rust-side composition logic, plus hooks for the JavaScript side.
25/// If you implement the functions in this trait to build your own JavaScript interface, then you
26/// can call [`HybridComposition::compose`] to run the complete composition process.
27///
28/// JavaScript should be implemented using `@apollo/composition@2.9.0-connectors.0`.
29#[allow(async_fn_in_trait)]
30pub trait HybridComposition {
31    /// Call the JavaScript `composeServices` function from `@apollo/composition` plus whatever
32    /// extra logic you need. Make sure to disable satisfiability, like `composeServices(definitions, {runSatisfiability: false})`
33    async fn compose_services_without_satisfiability(
34        &mut self,
35        subgraph_definitions: Vec<SubgraphDefinition>,
36    ) -> Option<SupergraphSdl<'_>>;
37
38    /// Call the JavaScript `validateSatisfiability` function from `@apollo/composition` plus whatever
39    /// extra logic you need.
40    ///
41    /// # Input
42    ///
43    /// The `validateSatisfiability` function wants an argument like `{ supergraphSdl }`. That field
44    /// should be the value that's updated when [`update_supergraph_sdl`] is called.
45    ///
46    /// # Output
47    ///
48    /// If satisfiability completes from JavaScript, either a list of hints (could be empty, the Ok case) or a list
49    /// of errors (never empty, the Err case) will be returned. If Satisfiability _can't_ be run, you can return a single error
50    /// (`Err(vec![Issue])`) indicating what went wrong.
51    async fn validate_satisfiability(&mut self) -> Result<Vec<Issue>, Vec<Issue>>;
52
53    /// Allows the Rust composition code to modify the stored supergraph SDL
54    /// (for example, to expand connectors).
55    fn update_supergraph_sdl(&mut self, supergraph_sdl: String);
56
57    /// When the Rust composition/validation code finds issues, it will call this method to add
58    /// them to the list of issues that will be returned to the user.
59    ///
60    /// It's on the implementor of this trait to convert `From<Issue>`
61    fn add_issues<Source: Iterator<Item = Issue>>(&mut self, issues: Source);
62
63    /// Runs the complete composition process, hooking into both the Rust and JavaScript implementations.
64    ///
65    /// # Asyncness
66    ///
67    /// While this function is async to allow for flexible JavaScript execution, it is a CPU-heavy task.
68    /// Take care when consuming this in an async context, as it may block longer than desired.
69    ///
70    /// # Algorithm
71    ///
72    /// 1. Run Rust-based validation on the subgraphs
73    /// 2. Call [`compose_services_without_satisfiability`] to run JavaScript-based composition
74    /// 3. Run Rust-based validation on the supergraph
75    /// 4. Call [`validate_satisfiability`] to run JavaScript-based validation on the supergraph
76    async fn compose(&mut self, subgraph_definitions: Vec<SubgraphDefinition>) {
77        // `@cacheTag` directive validation
78        if let Err(cache_tag_errors) = validate_cache_tag_in_subgraphs(&subgraph_definitions) {
79            self.add_issues(cache_tag_errors.into_iter());
80            return;
81        }
82
83        // connectors subgraph validations
84        let ConnectorsValidationResult {
85            subgraphs,
86            parsed_subgraphs,
87            hints: connector_hints,
88        } = match validate_connector_subgraphs(subgraph_definitions) {
89            Ok(results) => results,
90            Err(errors) => {
91                self.add_issues(errors.into_iter());
92                return;
93            }
94        };
95        self.add_issues(connector_hints.into_iter());
96
97        let Some(supergraph_sdl) = self
98            .compose_services_without_satisfiability(subgraphs)
99            .await
100        else {
101            return;
102        };
103
104        // Any issues with overrides are fatal since they'll cause errors in expansion,
105        // so we return early if we see any.
106        let override_errors = validate_overrides(parsed_subgraphs);
107        if !override_errors.is_empty() {
108            self.add_issues(override_errors.into_iter());
109            return;
110        }
111
112        let expansion_result = match expand_connectors(supergraph_sdl, &Default::default()) {
113            Ok(result) => result,
114            Err(err) => {
115                self.add_issues(once(Issue {
116                    code: "INTERNAL_ERROR".to_string(),
117                    message: format!(
118                        "Composition failed due to an internal error when expanding connectors, please report this: {err}"
119                    ),
120                    locations: vec![],
121                    severity: Severity::Error,
122                }));
123                return;
124            }
125        };
126        match expansion_result {
127            ExpansionResult::Expanded {
128                raw_sdl,
129                connectors: Connectors {
130                    by_service_name, ..
131                },
132                ..
133            } => {
134                let original_supergraph_sdl = supergraph_sdl.to_string();
135                self.update_supergraph_sdl(raw_sdl);
136                let satisfiability_result = self.validate_satisfiability().await;
137                self.add_issues(
138                    satisfiability_result_into_issues(satisfiability_result).map(|mut issue| {
139                        sanitize_connectors_issue(&mut issue, by_service_name.iter());
140                        issue
141                    }),
142                );
143
144                self.update_supergraph_sdl(original_supergraph_sdl);
145            }
146            ExpansionResult::Unchanged => {
147                let satisfiability_result = self.validate_satisfiability().await;
148                self.add_issues(satisfiability_result_into_issues(satisfiability_result));
149            }
150        }
151    }
152
153    /// <div class="warning">*** EXPERIMENTAL ***</div>
154    ///
155    /// Runs the composition process with granular composition phases that allow replacing individual
156    /// steps with Rust and/or JavaScript implementations.
157    ///
158    /// 1. <connectors> subgraph validation
159    /// 2. Initialize subgraphs - parses SDL into a GraphQL schema
160    /// 3. Expands subgraphs - adds all missing federation definitions
161    /// 4. Upgrade subgraphs - upgrades fed v1 schemas to fed v2
162    /// 5. Validate subgraphs
163    /// 6. Pre-merge validations (includes connectors validations)
164    /// 7. Merge subgraphs into a supergrpah
165    /// 8. Post merge validations
166    /// 9. <connectors> expand supergraph
167    /// 10. Validate satisfiability
168    ///
169    /// In case of a composition failure, we return a list of errors from the current composition
170    /// phase.
171    async fn experimental_compose(
172        mut self,
173        subgraph_definitions: Vec<SubgraphDefinition>,
174    ) -> Result<PluginResult, Vec<Issue>>
175    where
176        Self: Sized,
177    {
178        // `@cacheTag` directive validation
179        validate_cache_tag_in_subgraphs(&subgraph_definitions)?;
180
181        // connectors validations
182        // Any issues with overrides are fatal since they'll cause errors in expansion,
183        // so we return early if we see any.
184        // TODO those validations should be moved to subgraph validations in the apollo-federation crate instead
185        let ConnectorsValidationResult {
186            subgraphs: connected_subgraphs,
187            parsed_subgraphs,
188            hints: connector_hints,
189        } = validate_connector_subgraphs(subgraph_definitions)?;
190
191        let upgraded_subgraphs = self
192            .experimental_upgrade_subgraphs(connected_subgraphs)
193            .await?;
194
195        // merge
196        let merge_result = self
197            .experimental_merge_subgraphs(upgraded_subgraphs)
198            .await?;
199
200        // Extra connectors validation after merging.
201        // - So that connectors-related override errors will only be reported if merging was
202        //   successful.
203        let override_errors = validate_overrides(parsed_subgraphs);
204        if !override_errors.is_empty() {
205            return Err(override_errors);
206        }
207
208        // expand connectors as needed
209        let supergraph_sdl = merge_result.supergraph.clone();
210        let expansion_result = match expand_connectors(&supergraph_sdl, &Default::default()) {
211            Ok(result) => result,
212            Err(err) => {
213                return Err(vec![err.into()]);
214            }
215        };
216
217        // verify satisfiability
218        match expansion_result {
219            ExpansionResult::Expanded {
220                raw_sdl,
221                connectors: Connectors {
222                    by_service_name, ..
223                },
224                ..
225            } => {
226                self.experimental_validate_satisfiability(raw_sdl.as_str())
227                    .await
228                    .map(|s| {
229                        let mut composition_hints = merge_result.hints;
230                        composition_hints.extend(s);
231
232                        let mut build_messages: Vec<_> =
233                            connector_hints.into_iter().map(|h| h.into()).collect();
234                        build_messages.extend(composition_hints.into_iter().map(|h| {
235                            let mut issue = Into::<Issue>::into(h);
236                            sanitize_connectors_issue(&mut issue, by_service_name.iter());
237                            issue.into()
238                        }));
239                        // return original supergraph
240                        PluginResult::new(Ok(supergraph_sdl), build_messages)
241                    })
242                    .map_err(|err| {
243                        err.into_iter()
244                            .map(|mut issue| {
245                                sanitize_connectors_issue(&mut issue, by_service_name.iter());
246                                issue
247                            })
248                            .collect()
249                    })
250            }
251            ExpansionResult::Unchanged => self
252                .experimental_validate_satisfiability(supergraph_sdl.as_str())
253                .await
254                .map(|s| {
255                    let mut hints = merge_result.hints;
256                    hints.extend(s);
257
258                    let build_messages: Vec<_> = hints
259                        .into_iter()
260                        .map(|h| Into::<Issue>::into(h).into())
261                        .collect();
262                    PluginResult::new(Ok(supergraph_sdl), build_messages)
263                }),
264        }
265    }
266
267    /// Maps to buildSubgraph & upgradeSubgraphsIfNecessary and performs following steps
268    ///
269    /// 1. Parses raw SDL schemas into Subgraph<Initial>
270    /// 2. Adds missing federation definitions to the subgraph schemas
271    /// 3. Upgrades federation v1 subgraphs to federation v2 schemas.
272    ///    This is a no-op if it is already a federation v2 subgraph.
273    /// 4. Validates the expanded/upgraded subgraph schemas.
274    async fn experimental_upgrade_subgraphs(
275        &mut self,
276        subgraphs: Vec<SubgraphDefinition>,
277    ) -> Result<Vec<SubgraphDefinition>, Vec<Issue>> {
278        let mut issues: Vec<Issue> = vec![];
279        let initial: Vec<Subgraph<Initial>> = subgraphs
280            .into_iter()
281            .map(|s| s.try_into())
282            .filter_map(|r| {
283                r.map_err(|e: SubgraphError| issues.extend(convert_subgraph_error_to_issues(e)))
284                    .ok()
285            })
286            .collect();
287        if !issues.is_empty() {
288            return Err(issues);
289        }
290        expand_subgraphs(initial)
291            .and_then(upgrade_subgraphs_if_necessary)
292            .map(|subgraphs| subgraphs.into_iter().map(|s| s.into()).collect())
293            .map_err(|errors| errors.into_iter().map(Issue::from).collect::<Vec<_>>())
294    }
295
296    /// In case of a merge failure, returns a list of errors.
297    async fn experimental_merge_subgraphs(
298        &mut self,
299        subgraphs: Vec<SubgraphDefinition>,
300    ) -> Result<MergeResult, Vec<Issue>> {
301        let mut subgraph_errors = vec![];
302        let validated: Vec<Subgraph<Validated>> = subgraphs
303            .into_iter()
304            .map(assume_subgraph_validated)
305            .filter_map(|r| {
306                r.map_err(|e| subgraph_errors.extend(convert_subgraph_error_to_issues(e)))
307                    .ok()
308            })
309            .collect();
310        if !subgraph_errors.is_empty() {
311            // this should never happen
312            return Err(subgraph_errors);
313        }
314        pre_merge_validations(&validated)
315            .map_err(|errors| errors.into_iter().map(Issue::from).collect::<Vec<_>>())?;
316        let supergraph = merge_subgraphs(validated)
317            .map_err(|errors| errors.into_iter().map(Issue::from).collect::<Vec<_>>())?;
318        post_merge_validations(&supergraph)
319            .map_err(|errors| errors.into_iter().map(Issue::from).collect::<Vec<_>>())?;
320        let hints = supergraph
321            .hints()
322            .iter()
323            .map(|hint| hint.clone().into())
324            .collect();
325        Ok(MergeResult {
326            supergraph: supergraph.schema().to_string(),
327            hints,
328        })
329    }
330
331    /// If successful, returns a list of hints (possibly empty); Otherwise, returns a list of errors.
332    async fn experimental_validate_satisfiability(
333        &mut self,
334        supergraph_sdl: &str,
335    ) -> Result<Vec<Issue>, Vec<Issue>> {
336        let supergraph = Supergraph::parse(supergraph_sdl).map_err(|e| vec![Issue::from(e)])?;
337        validate_satisfiability(supergraph)
338            .map(|s| s.hints().iter().map(|h| h.clone().into()).collect())
339            .map_err(|errors| errors.into_iter().map(Issue::from).collect::<Vec<_>>())
340    }
341}
342
343struct SubgraphSchema {
344    schema: Schema,
345    has_connectors: bool,
346}
347
348struct ConnectorsValidationResult {
349    subgraphs: Vec<SubgraphDefinition>,
350    parsed_subgraphs: HashMap<String, SubgraphSchema>,
351    hints: Vec<Issue>,
352}
353// TODO this should eventually move under expand/validate subgraph logic
354fn validate_connector_subgraphs(
355    subgraph_definitions: Vec<SubgraphDefinition>,
356) -> Result<ConnectorsValidationResult, Vec<Issue>> {
357    let mut subgraph_validation_errors = Vec::new();
358    let mut subgraph_validation_hints = Vec::new();
359    let mut parsed_schemas = HashMap::new();
360    let subgraph_definitions = subgraph_definitions
361        .into_iter()
362        .map(|mut subgraph| {
363            let ValidationResult {
364                errors,
365                has_connectors,
366                schema,
367                transformed,
368            } = validate(subgraph.sdl, &subgraph.name);
369            subgraph.sdl = transformed;
370            for error in errors {
371                let issue = Issue {
372                    code: error.code.to_string(),
373                    message: error.message,
374                    locations: error
375                        .locations
376                        .into_iter()
377                        .map(|range| SubgraphLocation {
378                            subgraph: Some(subgraph.name.clone()),
379                            range: Some(range),
380                        })
381                        .collect(),
382                    severity: convert_severity(error.code.severity()),
383                };
384                if issue.severity == Severity::Error {
385                    subgraph_validation_errors.push(issue);
386                } else {
387                    subgraph_validation_hints.push(issue);
388                }
389            }
390            parsed_schemas.insert(
391                subgraph.name.clone(),
392                SubgraphSchema {
393                    schema,
394                    has_connectors,
395                },
396            );
397            subgraph
398        })
399        .collect();
400
401    if !subgraph_validation_errors.is_empty() {
402        return Err(subgraph_validation_errors);
403    }
404    Ok(ConnectorsValidationResult {
405        subgraphs: subgraph_definitions,
406        parsed_subgraphs: parsed_schemas,
407        hints: subgraph_validation_hints,
408    })
409}
410
411/// Validate overrides for connector-related subgraphs
412///
413/// Overrides mess with the supergraph in ways that can be difficult to detect when
414/// expanding connectors; the supergraph may omit overridden fields and other shenanigans.
415/// To allow for a better developer experience, we check here if any connector-enabled subgraphs
416/// have fields overridden.
417fn validate_overrides(schemas: HashMap<String, SubgraphSchema>) -> Vec<Issue> {
418    let mut override_errors = Vec::new();
419    for (subgraph_name, SubgraphSchema { schema, .. }) in &schemas {
420        // We need to grab all fields in the schema since only fields can have the @override
421        // directive attached
422        macro_rules! extract_directives {
423            ($node:ident) => {
424                $node
425                    .fields
426                    .iter()
427                    .flat_map(|(name, field)| {
428                        field
429                            .directives
430                            .iter()
431                            .map(move |d| (format!("{}.{}", $node.name, name), d))
432                    })
433                    .collect::<Vec<_>>()
434            };
435        }
436
437        let override_directives = schema
438            .types
439            .values()
440            .flat_map(|v| match v {
441                ExtendedType::Object(node) => extract_directives!(node),
442                ExtendedType::Interface(node) => extract_directives!(node),
443                ExtendedType::InputObject(node) => extract_directives!(node),
444
445                // These types do not have fields
446                ExtendedType::Scalar(_) | ExtendedType::Union(_) | ExtendedType::Enum(_) => {
447                    Vec::new()
448                }
449            })
450            .filter(|(_, directive)| {
451                // TODO: The directive name for @override could have been aliased
452                // at the SDL level, so we'll need to extract the aliased name here instead
453                directive.name == "override" || directive.name == "federation__override"
454            });
455
456        // Now see if we have any overrides that try to reference connector subgraphs
457        for (field, directive) in override_directives {
458            // If the override directive does not have a valid `from` field, then there is
459            // no point trying to validate it, as later steps will validate the entire schema.
460            let Ok(Some(overridden_subgraph_name)) = directive
461                .argument_by_name("from", schema)
462                .map(|node| node.as_str())
463            else {
464                continue;
465            };
466
467            if schemas
468                .get(overridden_subgraph_name)
469                .is_some_and(|schema| schema.has_connectors)
470            {
471                override_errors.push(Issue {
472                        code: "OVERRIDE_ON_CONNECTOR".to_string(),
473                        message: format!(
474                            r#"Field "{field}" on subgraph "{subgraph_name}" is trying to override connector-enabled subgraph "{overridden_subgraph_name}", which is not yet supported. See https://go.apollo.dev/connectors/limitations#override-is-partially-unsupported"#,
475                        ),
476                        locations: vec![SubgraphLocation {
477                            subgraph: Some(String::from(overridden_subgraph_name)),
478                            range: directive.line_column_range(&schema.sources),
479                        }],
480                        severity: Severity::Error,
481                    });
482            }
483        }
484    }
485
486    override_errors
487}
488
489fn sanitize_connectors_issue<'a>(
490    issue: &mut Issue,
491    connector_subgraphs: impl Iterator<Item = (&'a Arc<str>, &'a Connector)>,
492) {
493    for (service_name, connector) in connector_subgraphs {
494        issue.message = issue
495            .message
496            .replace(&**service_name, connector.id.subgraph_name.as_str());
497    }
498}
499
500fn validate_cache_tag_in_subgraphs(
501    subgraph_definitions: &[SubgraphDefinition],
502) -> Result<(), Vec<Issue>> {
503    let mut issues = Vec::new();
504    for subgraph_def in subgraph_definitions {
505        match validate_cache_tag_directives(
506            &subgraph_def.name,
507            &subgraph_def.url,
508            &subgraph_def.sdl,
509        ) {
510            Err(_err) => {
511                // Ignore internal errors as they must be GraphQL/Federation validation errors,
512                // which will be reported during the main validation.
513                break;
514            }
515            Ok(res) => {
516                if !res.errors.is_empty() {
517                    issues.extend(res.errors.into_iter().map(|err| {
518                        Issue {
519                            code: err.code(),
520                            message: err.message(),
521                            locations: err
522                                .locations
523                                .into_iter()
524                                .map(|range| SubgraphLocation {
525                                    subgraph: Some(subgraph_def.name.clone()),
526                                    range: Some(range),
527                                })
528                                .collect(),
529                            severity: Severity::Error,
530                        }
531                    }));
532                }
533            }
534        }
535    }
536    if !issues.is_empty() {
537        Err(issues)
538    } else {
539        Ok(())
540    }
541}
542
543pub type SupergraphSdl<'a> = &'a str;
544
545/// A successfully composed supergraph, optionally with some issues that should be addressed.
546#[derive(Clone, Debug)]
547pub struct PartialSuccess {
548    pub supergraph_sdl: String,
549    pub issues: Vec<Issue>,
550}
551
552fn convert_severity(severity: ValidationSeverity) -> Severity {
553    match severity {
554        ValidationSeverity::Error => Severity::Error,
555        ValidationSeverity::Warning => Severity::Warning,
556    }
557}
558
559fn satisfiability_result_into_issues(
560    result: Result<Vec<Issue>, Vec<Issue>>,
561) -> impl Iterator<Item = Issue> {
562    match result {
563        Ok(hints) => hints.into_iter(),
564        Err(errors) => errors.into_iter(),
565    }
566}
567
568// converts subgraph definitions to Subgraph<Validated> by assuming schema is already
569// expanded/upgraded/validated
570fn assume_subgraph_validated(
571    definition: SubgraphDefinition,
572) -> Result<Subgraph<Validated>, SubgraphError> {
573    Subgraph::parse(
574        definition.name.as_str(),
575        definition.url.as_str(),
576        definition.sdl.as_str(),
577    )
578    .and_then(|s| s.assume_expanded())
579    .map(|s| s.assume_validated())
580}
581
582fn convert_subgraph_error_to_issues(error: SubgraphError) -> Vec<Issue> {
583    error
584        .to_composition_errors()
585        .map(|err| err.into())
586        .collect()
587}