apollo_federation/composition/
mod.rs1mod satisfiability;
2
3use std::sync::Arc;
4use std::vec;
5
6use tracing::instrument;
7
8pub use crate::composition::satisfiability::validate_satisfiability;
9use crate::connectors::Connector;
10use crate::connectors::expand::Connectors;
11use crate::connectors::expand::ExpansionResult;
12use crate::connectors::expand::expand_connectors;
13use crate::error::CompositionError;
14use crate::merger::merge::Merger;
15pub use crate::schema::schema_upgrader::upgrade_subgraphs_if_necessary;
16use crate::schema::validators::root_fields::validate_consistent_root_fields;
17use crate::subgraph::typestate::Expanded;
18use crate::subgraph::typestate::Initial;
19use crate::subgraph::typestate::Subgraph;
20use crate::subgraph::typestate::Validated;
21pub use crate::supergraph::Merged;
22pub use crate::supergraph::Satisfiable;
23pub use crate::supergraph::Supergraph;
24
25#[derive(Debug, Default, Clone)]
29pub struct CompositionOptions {
30 pub max_validation_subgraph_paths: Option<usize>,
32}
33
34#[instrument(skip(subgraphs, options))]
36pub fn compose(
37 subgraphs: Vec<Subgraph<Initial>>,
38 options: CompositionOptions,
39) -> Result<Supergraph<Satisfiable>, Vec<CompositionError>> {
40 let mut subgraphs = subgraphs;
43 subgraphs.sort_by(|s1, s2| s1.name.cmp(&s2.name));
44
45 tracing::debug!("Expanding subgraphs...");
46 let expanded_subgraphs = expand_subgraphs(subgraphs)?;
47 tracing::debug!("Upgrading subgraphs...");
48 let validated_subgraphs = upgrade_subgraphs_if_necessary(expanded_subgraphs)?;
49
50 tracing::debug!("Pre-merge validations...");
51 pre_merge_validations(&validated_subgraphs)?;
52 tracing::debug!("Merging subgraphs...");
53 let supergraph = merge_subgraphs(validated_subgraphs, &options)?;
54 tracing::debug!("Post-merge validations...");
55 post_merge_validations(&supergraph)?;
56 tracing::debug!("Validating satisfiability...");
57 validate_satisfiability(supergraph, &options)
58}
59
60pub fn compose_with_connectors(
62 subgraphs: Vec<Subgraph<Initial>>,
63 options: CompositionOptions,
64) -> Result<Supergraph<Satisfiable>, Vec<CompositionError>> {
65 tracing::debug!("Expanding subgraphs...");
72 let expanded_subgraphs = expand_subgraphs(subgraphs)?;
73
74 tracing::debug!("Upgrading subgraphs...");
75 let validated_subgraphs = upgrade_subgraphs_if_necessary(expanded_subgraphs)?;
76
77 tracing::debug!("Pre-merge validations...");
78 pre_merge_validations(&validated_subgraphs)?;
79
80 tracing::debug!("Merging subgraphs...");
81 let supergraph = merge_subgraphs(validated_subgraphs, &options)?;
82
83 tracing::debug!("Post-merge validations...");
84 post_merge_validations(&supergraph)?;
85 tracing::debug!("Validating satisfiability...");
89 validate_satisfiability_with_connectors(supergraph, &options)
90}
91
92#[instrument(skip(subgraphs))]
95pub fn expand_subgraphs(
96 subgraphs: Vec<Subgraph<Initial>>,
97) -> Result<Vec<Subgraph<Expanded>>, Vec<CompositionError>> {
98 let mut errors: Vec<CompositionError> = vec![];
99 let expanded: Vec<Subgraph<Expanded>> = subgraphs
100 .into_iter()
101 .map(|s| s.expand_links())
102 .filter_map(|r| r.map_err(|e| errors.extend(e.to_composition_errors())).ok())
103 .collect();
104 if errors.is_empty() {
105 Ok(expanded)
106 } else {
107 Err(errors)
108 }
109}
110
111#[instrument(skip(subgraphs))]
113pub fn pre_merge_validations(
114 subgraphs: &[Subgraph<Validated>],
115) -> Result<(), Vec<CompositionError>> {
116 validate_consistent_root_fields(subgraphs)?;
117 Ok(())
119}
120
121#[instrument(skip(subgraphs, options))]
122pub fn merge_subgraphs(
123 subgraphs: Vec<Subgraph<Validated>>,
124 options: &CompositionOptions,
125) -> Result<Supergraph<Merged>, Vec<CompositionError>> {
126 let merger = Merger::new(subgraphs, options.clone()).map_err(|e| {
127 vec![CompositionError::InternalError {
128 message: e.to_string(),
129 }]
130 })?;
131 let result = merger.merge().map_err(|e| {
132 vec![CompositionError::InternalError {
133 message: e.to_string(),
134 }]
135 })?;
136 tracing::trace!(
137 "Merge has {} errors and {} hints",
138 result.errors.len(),
139 result.hints.len()
140 );
141 if result.errors.is_empty() {
142 let Some(supergraph_schema) = result.supergraph else {
143 return Err(vec![CompositionError::InternalError {
144 message: "Merge completed with no supergraph schema".to_string(),
145 }]);
146 };
147 let supergraph = Supergraph::with_hints(supergraph_schema, result.hints);
148 Ok(supergraph)
149 } else {
150 Err(result.errors)
151 }
152}
153
154#[instrument(skip(_supergraph))]
155pub fn post_merge_validations(
156 _supergraph: &Supergraph<Merged>,
157) -> Result<(), Vec<CompositionError>> {
158 Ok(())
161}
162
163pub fn validate_satisfiability_with_connectors(
166 supergraph: Supergraph<Merged>,
167 options: &CompositionOptions,
168) -> Result<Supergraph<Satisfiable>, Vec<CompositionError>> {
169 let supergraph_str = supergraph.schema().schema().to_string();
171 let expansion_result = expand_connectors(&supergraph_str, &Default::default())
172 .map_err(|e| vec![CompositionError::InternalError {
173 message: format!("Composition failed due to an internal error when expanding connectors, please report this: {e}"),
174 }])?;
175
176 match expansion_result {
178 ExpansionResult::Expanded {
179 raw_sdl,
180 connectors: Connectors {
181 by_service_name, ..
182 },
183 ..
184 } => {
185 let supergraph = Supergraph::parse(&raw_sdl).map_err(|e| {
186 vec![CompositionError::InternalError {
187 message: e.to_string(),
188 }]
189 })?;
190 let mut result = validate_satisfiability(supergraph, options);
191
192 match &mut result {
194 Ok(supergraph) => {
195 for hint in supergraph.hints_mut() {
196 sanitize_connectors_message(&mut hint.message, by_service_name.iter());
197 }
198 }
199 Err(issues) => {
200 for issue in issues.iter_mut() {
201 sanitize_connectors_error(issue, by_service_name.iter());
202 }
203 }
204 }
205 result
206 }
207 ExpansionResult::Unchanged => validate_satisfiability(supergraph, options),
208 }
209}
210
211fn sanitize_connectors_error<'a>(
212 issue: &mut CompositionError,
213 connector_subgraphs: impl Iterator<Item = (&'a Arc<str>, &'a Connector)>,
214) {
215 match issue {
216 CompositionError::SatisfiabilityError { message } => {
217 sanitize_connectors_message(message, connector_subgraphs);
218 }
219 CompositionError::ShareableHasMismatchedRuntimeTypes { message } => {
220 sanitize_connectors_message(message, connector_subgraphs);
221 }
222 _ => {}
223 }
224}
225
226fn sanitize_connectors_message<'a>(
227 message: &mut String,
228 connector_subgraphs: impl Iterator<Item = (&'a Arc<str>, &'a Connector)>,
229) {
230 for (service_name, connector) in connector_subgraphs {
231 *message = message.replace(&**service_name, connector.id.subgraph_name.as_str());
232 }
233}