apollo_composition/
lib.rs

1use apollo_compiler::{schema::ExtendedType, Schema};
2use apollo_federation::composition::{
3    expand_subgraphs, merge_subgraphs, post_merge_validations, pre_merge_validations,
4    upgrade_subgraphs_if_necessary, validate_satisfiability, Supergraph,
5};
6use apollo_federation::connectors::{
7    expand::{expand_connectors, Connectors, ExpansionResult},
8    validation::{validate, Severity as ValidationSeverity, ValidationResult},
9    Connector,
10};
11use apollo_federation::internal_composition_api::validate_cache_tag_directives;
12use apollo_federation::subgraph::typestate::{Initial, Subgraph, Validated};
13use apollo_federation::subgraph::SubgraphError;
14use apollo_federation_types::build_plugin::PluginResult;
15use apollo_federation_types::composition::{MergeResult, SubgraphLocation};
16use apollo_federation_types::{
17    composition::{Issue, Severity},
18    javascript::SubgraphDefinition,
19};
20use std::collections::HashMap;
21use std::iter::once;
22use std::sync::Arc;
23
24/// This trait includes all the Rust-side composition logic, plus hooks for the JavaScript side.
25/// If you implement the functions in this trait to build your own JavaScript interface, then you
26/// can call [`HybridComposition::compose`] to run the complete composition process.
27///
28/// JavaScript should be implemented using `@apollo/composition@2.9.0-connectors.0`.
29#[allow(async_fn_in_trait)]
30pub trait HybridComposition {
31    /// Call the JavaScript `composeServices` function from `@apollo/composition` plus whatever
32    /// extra logic you need. Make sure to disable satisfiability, like `composeServices(definitions, {runSatisfiability: false})`
33    async fn compose_services_without_satisfiability(
34        &mut self,
35        subgraph_definitions: Vec<SubgraphDefinition>,
36    ) -> Option<SupergraphSdl<'_>>;
37
38    /// Call the JavaScript `validateSatisfiability` function from `@apollo/composition` plus whatever
39    /// extra logic you need.
40    ///
41    /// # Input
42    ///
43    /// The `validateSatisfiability` function wants an argument like `{ supergraphSdl }`. That field
44    /// should be the value that's updated when [`update_supergraph_sdl`] is called.
45    ///
46    /// # Output
47    ///
48    /// If satisfiability completes from JavaScript, either a list of hints (could be empty, the Ok case) or a list
49    /// of errors (never empty, the Err case) will be returned. If Satisfiability _can't_ be run, you can return a single error
50    /// (`Err(vec![Issue])`) indicating what went wrong.
51    async fn validate_satisfiability(&mut self) -> Result<Vec<Issue>, Vec<Issue>>;
52
53    /// Allows the Rust composition code to modify the stored supergraph SDL
54    /// (for example, to expand connectors).
55    fn update_supergraph_sdl(&mut self, supergraph_sdl: String);
56
57    /// When the Rust composition/validation code finds issues, it will call this method to add
58    /// them to the list of issues that will be returned to the user.
59    ///
60    /// It's on the implementor of this trait to convert `From<Issue>`
61    fn add_issues<Source: Iterator<Item = Issue>>(&mut self, issues: Source);
62
63    /// Runs the complete composition process, hooking into both the Rust and JavaScript implementations.
64    ///
65    /// # Asyncness
66    ///
67    /// While this function is async to allow for flexible JavaScript execution, it is a CPU-heavy task.
68    /// Take care when consuming this in an async context, as it may block longer than desired.
69    ///
70    /// # Algorithm
71    ///
72    /// 1. Run Rust-based validation on the subgraphs
73    /// 2. Call [`compose_services_without_satisfiability`] to run JavaScript-based composition
74    /// 3. Run Rust-based validation on the supergraph
75    /// 4. Call [`validate_satisfiability`] to run JavaScript-based validation on the supergraph
76    async fn compose(&mut self, subgraph_definitions: Vec<SubgraphDefinition>) {
77        let mut cache_tag_errors = Vec::new();
78        for subgraph_def in &subgraph_definitions {
79            match validate_cache_tag_directives(
80                &subgraph_def.name,
81                &subgraph_def.url,
82                &subgraph_def.sdl,
83            ) {
84                Err(err) => {
85                    self.add_issues(once(Issue {
86                            code: "INTERNAL_ERROR".to_string(),
87                            message: format!(
88                                "Composition failed due to an internal error when validating cache tag, please report this: {err}"
89                            ),
90                            locations: vec![],
91                            severity: Severity::Error,
92                        }));
93                    return;
94                }
95                Ok(res) => {
96                    if !res.errors.is_empty() {
97                        cache_tag_errors.extend(res.errors.into_iter().map(|err| {
98                            Issue {
99                                code: err.code(),
100                                message: err.message(),
101                                locations: err
102                                    .locations
103                                    .into_iter()
104                                    .map(|range| SubgraphLocation {
105                                        subgraph: Some(subgraph_def.name.clone()),
106                                        range: Some(range),
107                                    })
108                                    .collect(),
109                                severity: Severity::Error,
110                            }
111                        }));
112                    }
113                }
114            }
115        }
116        if !cache_tag_errors.is_empty() {
117            self.add_issues(cache_tag_errors.into_iter());
118            return;
119        }
120
121        // connectors subgraph validations
122        let ConnectorsValidationResult {
123            subgraphs,
124            parsed_subgraphs,
125            hints: connector_hints,
126        } = match validate_connector_subgraphs(subgraph_definitions) {
127            Ok(results) => results,
128            Err(errors) => {
129                self.add_issues(errors.into_iter());
130                return;
131            }
132        };
133        self.add_issues(connector_hints.into_iter());
134
135        let Some(supergraph_sdl) = self
136            .compose_services_without_satisfiability(subgraphs)
137            .await
138        else {
139            return;
140        };
141
142        // Any issues with overrides are fatal since they'll cause errors in expansion,
143        // so we return early if we see any.
144        let override_errors = validate_overrides(parsed_subgraphs);
145        if !override_errors.is_empty() {
146            self.add_issues(override_errors.into_iter());
147            return;
148        }
149
150        let expansion_result = match expand_connectors(supergraph_sdl, &Default::default()) {
151            Ok(result) => result,
152            Err(err) => {
153                self.add_issues(once(Issue {
154                    code: "INTERNAL_ERROR".to_string(),
155                    message: format!(
156                        "Composition failed due to an internal error when expanding connectors, please report this: {err}"
157                    ),
158                    locations: vec![],
159                    severity: Severity::Error,
160                }));
161                return;
162            }
163        };
164        match expansion_result {
165            ExpansionResult::Expanded {
166                raw_sdl,
167                connectors: Connectors {
168                    by_service_name, ..
169                },
170                ..
171            } => {
172                let original_supergraph_sdl = supergraph_sdl.to_string();
173                self.update_supergraph_sdl(raw_sdl);
174                let satisfiability_result = self.validate_satisfiability().await;
175                self.add_issues(
176                    satisfiability_result_into_issues(satisfiability_result).map(|mut issue| {
177                        sanitize_connectors_issue(&mut issue, by_service_name.iter());
178                        issue
179                    }),
180                );
181
182                self.update_supergraph_sdl(original_supergraph_sdl);
183            }
184            ExpansionResult::Unchanged => {
185                let satisfiability_result = self.validate_satisfiability().await;
186                self.add_issues(satisfiability_result_into_issues(satisfiability_result));
187            }
188        }
189    }
190
191    /// <div class="warning">*** EXPERIMENTAL ***</div>
192    ///
193    /// Runs the composition process with granular composition phases that allow replacing individual
194    /// steps with Rust and/or JavaScript implementations.
195    ///
196    /// 1. <connectors> subgraph validation
197    /// 2. Initialize subgraphs - parses SDL into a GraphQL schema
198    /// 3. Expands subgraphs - adds all missing federation definitions
199    /// 4. Upgrade subgraphs - upgrades fed v1 schemas to fed v2
200    /// 5. Validate subgraphs
201    /// 6. Pre-merge validations (includes connectors validations)
202    /// 7. Merge subgraphs into a supergrpah
203    /// 8. Post merge validations
204    /// 9. <connectors> expand supergraph
205    /// 10. Validate satisfiability
206    ///
207    /// In case of a composition failure, we return a list of errors from the current composition
208    /// phase.
209    async fn experimental_compose(
210        mut self,
211        subgraph_definitions: Vec<SubgraphDefinition>,
212    ) -> Result<PluginResult, Vec<Issue>>
213    where
214        Self: Sized,
215    {
216        // connectors validations
217        // Any issues with overrides are fatal since they'll cause errors in expansion,
218        // so we return early if we see any.
219        // TODO those validations should be moved to subgraph validations in the apollo-federation crate instead
220        let ConnectorsValidationResult {
221            subgraphs: connected_subgraphs,
222            parsed_subgraphs,
223            hints: connector_hints,
224        } = validate_connector_subgraphs(subgraph_definitions)?;
225
226        let upgraded_subgraphs = self
227            .experimental_upgrade_subgraphs(connected_subgraphs)
228            .await?;
229
230        // merge
231        let merge_result = self
232            .experimental_merge_subgraphs(upgraded_subgraphs)
233            .await?;
234
235        // Extra connectors validation after merging.
236        // - So that connectors-related override errors will only be reported if merging was
237        //   successful.
238        let override_errors = validate_overrides(parsed_subgraphs);
239        if !override_errors.is_empty() {
240            return Err(override_errors);
241        }
242
243        // expand connectors as needed
244        let supergraph_sdl = merge_result.supergraph.clone();
245        let expansion_result = match expand_connectors(&supergraph_sdl, &Default::default()) {
246            Ok(result) => result,
247            Err(err) => {
248                return Err(vec![err.into()]);
249            }
250        };
251
252        // verify satisfiability
253        match expansion_result {
254            ExpansionResult::Expanded {
255                raw_sdl,
256                connectors: Connectors {
257                    by_service_name, ..
258                },
259                ..
260            } => {
261                self.experimental_validate_satisfiability(raw_sdl.as_str())
262                    .await
263                    .map(|s| {
264                        let mut composition_hints = merge_result.hints;
265                        composition_hints.extend(s);
266
267                        let mut build_messages: Vec<_> =
268                            connector_hints.into_iter().map(|h| h.into()).collect();
269                        build_messages.extend(composition_hints.into_iter().map(|h| {
270                            let mut issue = Into::<Issue>::into(h);
271                            sanitize_connectors_issue(&mut issue, by_service_name.iter());
272                            issue.into()
273                        }));
274                        // return original supergraph
275                        PluginResult::new(Ok(supergraph_sdl), build_messages)
276                    })
277                    .map_err(|err| {
278                        err.into_iter()
279                            .map(|mut issue| {
280                                sanitize_connectors_issue(&mut issue, by_service_name.iter());
281                                issue
282                            })
283                            .collect()
284                    })
285            }
286            ExpansionResult::Unchanged => self
287                .experimental_validate_satisfiability(supergraph_sdl.as_str())
288                .await
289                .map(|s| {
290                    let mut hints = merge_result.hints;
291                    hints.extend(s);
292
293                    let build_messages: Vec<_> = hints
294                        .into_iter()
295                        .map(|h| Into::<Issue>::into(h).into())
296                        .collect();
297                    PluginResult::new(Ok(supergraph_sdl), build_messages)
298                }),
299        }
300    }
301
302    /// Maps to buildSubgraph & upgradeSubgraphsIfNecessary and performs following steps
303    ///
304    /// 1. Parses raw SDL schemas into Subgraph<Initial>
305    /// 2. Adds missing federation definitions to the subgraph schemas
306    /// 3. Upgrades federation v1 subgraphs to federation v2 schemas.
307    ///    This is a no-op if it is already a federation v2 subgraph.
308    /// 4. Validates the expanded/upgraded subgraph schemas.
309    async fn experimental_upgrade_subgraphs(
310        &mut self,
311        subgraphs: Vec<SubgraphDefinition>,
312    ) -> Result<Vec<SubgraphDefinition>, Vec<Issue>> {
313        let mut issues: Vec<Issue> = vec![];
314        let initial: Vec<Subgraph<Initial>> = subgraphs
315            .into_iter()
316            .map(|s| s.try_into())
317            .filter_map(|r| {
318                r.map_err(|e: SubgraphError| issues.extend(convert_subgraph_error_to_issues(e)))
319                    .ok()
320            })
321            .collect();
322        if !issues.is_empty() {
323            return Err(issues);
324        }
325        expand_subgraphs(initial)
326            .and_then(upgrade_subgraphs_if_necessary)
327            .map(|subgraphs| subgraphs.into_iter().map(|s| s.into()).collect())
328            .map_err(|errors| errors.into_iter().map(Issue::from).collect::<Vec<_>>())
329    }
330
331    /// In case of a merge failure, returns a list of errors.
332    async fn experimental_merge_subgraphs(
333        &mut self,
334        subgraphs: Vec<SubgraphDefinition>,
335    ) -> Result<MergeResult, Vec<Issue>> {
336        let mut subgraph_errors = vec![];
337        let validated: Vec<Subgraph<Validated>> = subgraphs
338            .into_iter()
339            .map(assume_subgraph_validated)
340            .filter_map(|r| {
341                r.map_err(|e| subgraph_errors.extend(convert_subgraph_error_to_issues(e)))
342                    .ok()
343            })
344            .collect();
345        if !subgraph_errors.is_empty() {
346            // this should never happen
347            return Err(subgraph_errors);
348        }
349        pre_merge_validations(&validated)
350            .map_err(|errors| errors.into_iter().map(Issue::from).collect::<Vec<_>>())?;
351        let supergraph = merge_subgraphs(validated)
352            .map_err(|errors| errors.into_iter().map(Issue::from).collect::<Vec<_>>())?;
353        post_merge_validations(&supergraph)
354            .map_err(|errors| errors.into_iter().map(Issue::from).collect::<Vec<_>>())?;
355        let hints = supergraph
356            .hints()
357            .iter()
358            .map(|hint| hint.clone().into())
359            .collect();
360        Ok(MergeResult {
361            supergraph: supergraph.schema().to_string(),
362            hints,
363        })
364    }
365
366    /// If successful, returns a list of hints (possibly empty); Otherwise, returns a list of errors.
367    async fn experimental_validate_satisfiability(
368        &mut self,
369        supergraph_sdl: &str,
370    ) -> Result<Vec<Issue>, Vec<Issue>> {
371        let supergraph = Supergraph::parse(supergraph_sdl).map_err(|e| vec![Issue::from(e)])?;
372        validate_satisfiability(supergraph)
373            .map(|s| s.hints().iter().map(|h| h.clone().into()).collect())
374            .map_err(|errors| errors.into_iter().map(Issue::from).collect::<Vec<_>>())
375    }
376}
377
378struct SubgraphSchema {
379    schema: Schema,
380    has_connectors: bool,
381}
382
383struct ConnectorsValidationResult {
384    subgraphs: Vec<SubgraphDefinition>,
385    parsed_subgraphs: HashMap<String, SubgraphSchema>,
386    hints: Vec<Issue>,
387}
388// TODO this should eventually move under expand/validate subgraph logic
389fn validate_connector_subgraphs(
390    subgraph_definitions: Vec<SubgraphDefinition>,
391) -> Result<ConnectorsValidationResult, Vec<Issue>> {
392    let mut subgraph_validation_errors = Vec::new();
393    let mut subgraph_validation_hints = Vec::new();
394    let mut parsed_schemas = HashMap::new();
395    let subgraph_definitions = subgraph_definitions
396        .into_iter()
397        .map(|mut subgraph| {
398            let ValidationResult {
399                errors,
400                has_connectors,
401                schema,
402                transformed,
403            } = validate(subgraph.sdl, &subgraph.name);
404            subgraph.sdl = transformed;
405            for error in errors {
406                let issue = Issue {
407                    code: error.code.to_string(),
408                    message: error.message,
409                    locations: error
410                        .locations
411                        .into_iter()
412                        .map(|range| SubgraphLocation {
413                            subgraph: Some(subgraph.name.clone()),
414                            range: Some(range),
415                        })
416                        .collect(),
417                    severity: convert_severity(error.code.severity()),
418                };
419                if issue.severity == Severity::Error {
420                    subgraph_validation_errors.push(issue);
421                } else {
422                    subgraph_validation_hints.push(issue);
423                }
424            }
425            parsed_schemas.insert(
426                subgraph.name.clone(),
427                SubgraphSchema {
428                    schema,
429                    has_connectors,
430                },
431            );
432            subgraph
433        })
434        .collect();
435
436    if !subgraph_validation_errors.is_empty() {
437        return Err(subgraph_validation_errors);
438    }
439    Ok(ConnectorsValidationResult {
440        subgraphs: subgraph_definitions,
441        parsed_subgraphs: parsed_schemas,
442        hints: subgraph_validation_hints,
443    })
444}
445
446/// Validate overrides for connector-related subgraphs
447///
448/// Overrides mess with the supergraph in ways that can be difficult to detect when
449/// expanding connectors; the supergraph may omit overridden fields and other shenanigans.
450/// To allow for a better developer experience, we check here if any connector-enabled subgraphs
451/// have fields overridden.
452fn validate_overrides(schemas: HashMap<String, SubgraphSchema>) -> Vec<Issue> {
453    let mut override_errors = Vec::new();
454    for (subgraph_name, SubgraphSchema { schema, .. }) in &schemas {
455        // We need to grab all fields in the schema since only fields can have the @override
456        // directive attached
457        macro_rules! extract_directives {
458            ($node:ident) => {
459                $node
460                    .fields
461                    .iter()
462                    .flat_map(|(name, field)| {
463                        field
464                            .directives
465                            .iter()
466                            .map(move |d| (format!("{}.{}", $node.name, name), d))
467                    })
468                    .collect::<Vec<_>>()
469            };
470        }
471
472        let override_directives = schema
473            .types
474            .values()
475            .flat_map(|v| match v {
476                ExtendedType::Object(node) => extract_directives!(node),
477                ExtendedType::Interface(node) => extract_directives!(node),
478                ExtendedType::InputObject(node) => extract_directives!(node),
479
480                // These types do not have fields
481                ExtendedType::Scalar(_) | ExtendedType::Union(_) | ExtendedType::Enum(_) => {
482                    Vec::new()
483                }
484            })
485            .filter(|(_, directive)| {
486                // TODO: The directive name for @override could have been aliased
487                // at the SDL level, so we'll need to extract the aliased name here instead
488                directive.name == "override" || directive.name == "federation__override"
489            });
490
491        // Now see if we have any overrides that try to reference connector subgraphs
492        for (field, directive) in override_directives {
493            // If the override directive does not have a valid `from` field, then there is
494            // no point trying to validate it, as later steps will validate the entire schema.
495            let Ok(Some(overridden_subgraph_name)) = directive
496                .argument_by_name("from", schema)
497                .map(|node| node.as_str())
498            else {
499                continue;
500            };
501
502            if schemas
503                .get(overridden_subgraph_name)
504                .is_some_and(|schema| schema.has_connectors)
505            {
506                override_errors.push(Issue {
507                        code: "OVERRIDE_ON_CONNECTOR".to_string(),
508                        message: format!(
509                            r#"Field "{field}" on subgraph "{subgraph_name}" is trying to override connector-enabled subgraph "{overridden_subgraph_name}", which is not yet supported. See https://go.apollo.dev/connectors/limitations#override-is-partially-unsupported"#,
510                        ),
511                        locations: vec![SubgraphLocation {
512                            subgraph: Some(String::from(overridden_subgraph_name)),
513                            range: directive.line_column_range(&schema.sources),
514                        }],
515                        severity: Severity::Error,
516                    });
517            }
518        }
519    }
520
521    override_errors
522}
523
524fn sanitize_connectors_issue<'a>(
525    issue: &mut Issue,
526    connector_subgraphs: impl Iterator<Item = (&'a Arc<str>, &'a Connector)>,
527) {
528    for (service_name, connector) in connector_subgraphs {
529        issue.message = issue
530            .message
531            .replace(&**service_name, connector.id.subgraph_name.as_str());
532    }
533}
534
535pub type SupergraphSdl<'a> = &'a str;
536
537/// A successfully composed supergraph, optionally with some issues that should be addressed.
538#[derive(Clone, Debug)]
539pub struct PartialSuccess {
540    pub supergraph_sdl: String,
541    pub issues: Vec<Issue>,
542}
543
544fn convert_severity(severity: ValidationSeverity) -> Severity {
545    match severity {
546        ValidationSeverity::Error => Severity::Error,
547        ValidationSeverity::Warning => Severity::Warning,
548    }
549}
550
551fn satisfiability_result_into_issues(
552    result: Result<Vec<Issue>, Vec<Issue>>,
553) -> impl Iterator<Item = Issue> {
554    match result {
555        Ok(hints) => hints.into_iter(),
556        Err(errors) => errors.into_iter(),
557    }
558}
559
560// converts subgraph definitions to Subgraph<Validated> by assuming schema is already
561// expanded/upgraded/validated
562fn assume_subgraph_validated(
563    definition: SubgraphDefinition,
564) -> Result<Subgraph<Validated>, SubgraphError> {
565    Subgraph::parse(
566        definition.name.as_str(),
567        definition.url.as_str(),
568        definition.sdl.as_str(),
569    )
570    .and_then(|s| s.assume_expanded())
571    .map(|s| s.assume_validated())
572}
573
574fn convert_subgraph_error_to_issues(error: SubgraphError) -> Vec<Issue> {
575    error
576        .to_composition_errors()
577        .map(|err| err.into())
578        .collect()
579}