1use std::fmt::Display;
2use std::sync::Arc;
3
4use apollo_compiler::ExecutableDocument;
5use apollo_compiler::ast;
6use apollo_compiler::collections::HashMap;
7use apollo_compiler::validation::Valid;
8use apollo_federation::query_plan::requires_selection;
9use apollo_federation::query_plan::serializable_document::SerializableDocument;
10use indexmap::IndexSet;
11use serde::Deserialize;
12use serde::Serialize;
13use serde_json_bytes::ByteString;
14use serde_json_bytes::Map;
15use tokio::sync::broadcast::Sender;
16use tower::ServiceExt;
17use tracing::Instrument;
18use tracing::instrument;
19
20use super::rewrites;
21use super::selection::execute_selection_set;
22use super::subgraph_context::ContextualArguments;
23use super::subgraph_context::SubgraphContext;
24use crate::error::Error;
25use crate::error::FetchError;
26use crate::error::ValidationErrors;
27use crate::graphql;
28use crate::graphql::Request;
29use crate::json_ext;
30use crate::json_ext::Object;
31use crate::json_ext::Path;
32use crate::json_ext::Value;
33use crate::json_ext::ValueExt;
34use crate::plugins::authorization::AuthorizationPlugin;
35use crate::plugins::authorization::CacheKeyMetadata;
36use crate::services::SubgraphRequest;
37use crate::services::fetch::ErrorMapping;
38use crate::services::subgraph::BoxService;
39use crate::spec::QueryHash;
40use crate::spec::Schema;
41use crate::spec::SchemaHash;
42
43#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Hash, Deserialize, Serialize)]
45#[serde(rename_all = "camelCase")]
46#[non_exhaustive]
47#[cfg_attr(test, derive(schemars::JsonSchema))]
48pub enum OperationKind {
49 #[default]
50 Query,
51 Mutation,
52 Subscription,
53}
54
55impl Display for OperationKind {
56 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57 write!(f, "{}", self.default_type_name())
58 }
59}
60
61impl OperationKind {
62 pub(crate) const fn default_type_name(&self) -> &'static str {
63 match self {
64 OperationKind::Query => "Query",
65 OperationKind::Mutation => "Mutation",
66 OperationKind::Subscription => "Subscription",
67 }
68 }
69
70 pub(crate) const fn as_apollo_operation_type(&self) -> &'static str {
72 match self {
73 OperationKind::Query => "query",
74 OperationKind::Mutation => "mutation",
75 OperationKind::Subscription => "subscription",
76 }
77 }
78}
79
80impl From<OperationKind> for ast::OperationType {
81 fn from(value: OperationKind) -> Self {
82 match value {
83 OperationKind::Query => ast::OperationType::Query,
84 OperationKind::Mutation => ast::OperationType::Mutation,
85 OperationKind::Subscription => ast::OperationType::Subscription,
86 }
87 }
88}
89
90impl From<ast::OperationType> for OperationKind {
91 fn from(value: ast::OperationType) -> Self {
92 match value {
93 ast::OperationType::Query => OperationKind::Query,
94 ast::OperationType::Mutation => OperationKind::Mutation,
95 ast::OperationType::Subscription => OperationKind::Subscription,
96 }
97 }
98}
99
100pub(crate) type SubgraphSchemas = HashMap<String, SubgraphSchema>;
101
102pub(crate) struct SubgraphSchema {
103 pub(crate) schema: Arc<Valid<apollo_compiler::Schema>>,
104 pub(crate) hash: SchemaHash,
106}
107
108impl SubgraphSchema {
109 pub(crate) fn new(schema: Valid<apollo_compiler::Schema>) -> Self {
110 let sdl = schema.serialize().no_indent().to_string();
111 Self {
112 schema: Arc::new(schema),
113 hash: SchemaHash::new(&sdl),
114 }
115 }
116}
117
118#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
120#[serde(rename_all = "camelCase")]
121pub(crate) struct FetchNode {
122 pub(crate) service_name: Arc<str>,
124
125 #[serde(skip_serializing_if = "Vec::is_empty")]
127 #[serde(default)]
128 pub(crate) requires: Vec<requires_selection::Selection>,
129
130 pub(crate) variable_usages: Vec<Arc<str>>,
132
133 pub(crate) operation: SerializableDocument,
135
136 pub(crate) operation_name: Option<Arc<str>>,
138
139 pub(crate) operation_kind: OperationKind,
141
142 pub(crate) id: Option<String>,
144
145 pub(crate) input_rewrites: Option<Vec<rewrites::DataRewrite>>,
147
148 pub(crate) output_rewrites: Option<Vec<rewrites::DataRewrite>>,
150
151 pub(crate) context_rewrites: Option<Vec<rewrites::DataRewrite>>,
153
154 #[serde(default)]
157 pub(crate) schema_aware_hash: Arc<QueryHash>,
158
159 #[serde(default)]
161 pub(crate) authorization: Arc<CacheKeyMetadata>,
162}
163
164#[derive(Default)]
165pub(crate) struct Variables {
166 pub(crate) variables: Object,
167 pub(crate) inverted_paths: Vec<Vec<Path>>,
168 pub(crate) contextual_arguments: Option<ContextualArguments>,
169}
170
171impl Variables {
172 #[instrument(skip_all, level = "debug", name = "make_variables")]
173 #[allow(clippy::too_many_arguments)]
174 pub(crate) fn new(
175 requires: &[requires_selection::Selection],
176 variable_usages: &[Arc<str>],
177 data: &Value,
178 current_dir: &Path,
179 request: &Arc<http::Request<Request>>,
180 schema: &Schema,
181 input_rewrites: &Option<Vec<rewrites::DataRewrite>>,
182 context_rewrites: &Option<Vec<rewrites::DataRewrite>>,
183 ) -> Option<Variables> {
184 let body = request.body();
185 let mut subgraph_context = SubgraphContext::new(data, schema, context_rewrites);
186 if !requires.is_empty() {
187 let mut variables = Object::with_capacity(1 + variable_usages.len());
188
189 variables.extend(variable_usages.iter().filter_map(|key| {
190 body.variables
191 .get_key_value(key.as_ref())
192 .map(|(variable_key, value)| (variable_key.clone(), value.clone()))
193 }));
194
195 let mut inverted_paths: Vec<Vec<Path>> = Vec::new();
196 let mut values: IndexSet<Value> = IndexSet::default();
197 data.select_values_and_paths(schema, current_dir, |path, value| {
198 if let Some(context) = subgraph_context.as_mut() {
200 context.execute_on_path(path);
201 }
202
203 let mut value = execute_selection_set(value, requires, schema, None);
204 if value.as_object().map(|o| !o.is_empty()).unwrap_or(false) {
205 rewrites::apply_rewrites(schema, &mut value, input_rewrites);
206 match values.get_index_of(&value) {
207 Some(index) => {
208 inverted_paths[index].push(path.clone());
209 }
210 None => {
211 inverted_paths.push(vec![path.clone()]);
212 values.insert(value);
213 debug_assert!(inverted_paths.len() == values.len());
214 }
215 }
216 }
217 });
218
219 if values.is_empty() {
220 return None;
221 }
222
223 let representations = Value::Array(Vec::from_iter(values));
224 let contextual_arguments = match subgraph_context.as_mut() {
225 Some(context) => context.add_variables_and_get_args(&mut variables),
226 None => None,
227 };
228
229 variables.insert("representations", representations);
230 Some(Variables {
231 variables,
232 inverted_paths,
233 contextual_arguments,
234 })
235 } else {
236 if !current_dir.is_empty()
242 && data
243 .get_path(schema, current_dir)
244 .map(|value| value.is_null())
245 .unwrap_or(true)
246 {
247 return None;
248 }
249
250 Some(Variables {
251 variables: variable_usages
252 .iter()
253 .filter_map(|key| {
254 body.variables
255 .get_key_value(key.as_ref())
256 .map(|(variable_key, value)| (variable_key.clone(), value.clone()))
257 })
258 .collect::<Object>(),
259 inverted_paths: Vec::new(),
260 contextual_arguments: None,
261 })
262 }
263 }
264}
265
266impl FetchNode {
267 #[allow(clippy::too_many_arguments)]
268 pub(crate) async fn subgraph_fetch(
269 &self,
270 service: BoxService,
271 subgraph_request: SubgraphRequest,
272 current_dir: &Path,
273 schema: &Schema,
274 paths: Vec<Vec<Path>>,
275 operation_str: &str,
276 variables: Map<ByteString, Value>,
277 ) -> (Value, Vec<Error>) {
278 let (_parts, response) = match service
279 .oneshot(subgraph_request)
280 .instrument(tracing::trace_span!("subfetch_stream"))
281 .await
282 .map_to_graphql_error(self.service_name.to_string(), current_dir)
283 {
284 Err(e) => {
285 return (Value::default(), vec![e]);
286 }
287 Ok(res) => res.response.into_parts(),
288 };
289
290 super::log::trace_subfetch(&self.service_name, operation_str, &variables, &response);
291
292 if !response.is_primary() {
293 return (
294 Value::default(),
295 vec![
296 FetchError::SubrequestUnexpectedPatchResponse {
297 service: self.service_name.to_string(),
298 }
299 .to_graphql_error(Some(current_dir.to_owned())),
300 ],
301 );
302 }
303
304 let (value, errors) = self.response_at_path(schema, current_dir, paths, response);
305
306 (value, errors)
307 }
308
309 pub(crate) fn deferred_fetches(
310 current_dir: &Path,
311 id: &Option<String>,
312 deferred_fetches: &std::collections::HashMap<String, Sender<(Value, Vec<Error>)>>,
313 value: &Value,
314 errors: &[Error],
315 ) {
316 if let Some(id) = id
317 && let Some(sender) = deferred_fetches.get(id.as_str())
318 {
319 u64_counter!(
320 "apollo.router.operations.defer.fetch",
321 "Number of deferred responses fetched from subgraphs",
322 1
323 );
324 if let Err(e) = sender.clone().send((value.clone(), Vec::from(errors))) {
325 tracing::error!(
326 "error sending fetch result at path {} and id {:?} for deferred response building: {}",
327 current_dir,
328 id,
329 e
330 );
331 }
332 }
333 }
334
335 #[instrument(skip_all, level = "debug", name = "response_insert")]
336 pub(crate) fn response_at_path<'a>(
337 &'a self,
338 schema: &Schema,
339 current_dir: &'a Path,
340 inverted_paths: Vec<Vec<Path>>,
341 response: graphql::Response,
342 ) -> (Value, Vec<Error>) {
343 if !self.requires.is_empty() {
344 let entities_path = Path(vec![json_ext::PathElement::Key(
345 "_entities".to_string(),
346 None,
347 )]);
348
349 let mut errors: Vec<Error> = vec![];
350 for mut error in response.errors {
351 error.locations = Vec::new();
354
355 if let Some(ref path) = error.path {
357 if path.starts_with(&entities_path) {
358 match path.0.get(1) {
361 Some(json_ext::PathElement::Index(i)) => {
362 for values_path in
363 inverted_paths.get(*i).iter().flat_map(|v| v.iter())
364 {
365 errors.push(
366 Error::builder()
367 .locations(error.locations.clone())
368 .path(Path::from_iter(
371 values_path.0.iter().chain(&path.0[2..]).cloned(),
372 ))
373 .message(error.message.clone())
374 .and_extension_code(error.extension_code())
375 .extensions(error.extensions.clone())
376 .apollo_id(error.apollo_id())
378 .build(),
379 )
380 }
381 }
382 _ => {
383 error.path = Some(current_dir.clone());
384 errors.push(error)
385 }
386 }
387 } else {
388 error.path = Some(current_dir.clone());
389 errors.push(error);
390 }
391 } else {
392 error.path = Some(current_dir.clone());
393 errors.push(error);
394 }
395 }
396
397 if let Some(Value::Object(mut map)) = response.data
400 && let Some(entities) = map.remove("_entities")
401 {
402 tracing::trace!("received entities: {:?}", &entities);
403
404 if let Value::Array(array) = entities {
405 let mut value = Value::default();
406
407 for (index, mut entity) in array.into_iter().enumerate() {
408 rewrites::apply_rewrites(schema, &mut entity, &self.output_rewrites);
409
410 if let Some(paths) = inverted_paths.get(index) {
411 if paths.len() > 1 {
412 for path in &paths[1..] {
413 let _ = value.insert(path, entity.clone());
414 }
415 }
416
417 if let Some(path) = paths.first() {
418 let _ = value.insert(path, entity);
419 }
420 }
421 }
422 return (value, errors);
423 }
424 }
425
426 if errors.is_empty() {
431 tracing::warn!(
432 "Subgraph response from '{}' was missing key `_entities` and had no errors. This is likely a bug in the subgraph.",
433 self.service_name
434 );
435 }
436
437 (Value::Null, errors)
438 } else {
439 let current_slice =
440 if matches!(current_dir.last(), Some(&json_ext::PathElement::Flatten(_))) {
441 ¤t_dir.0[..current_dir.0.len() - 1]
442 } else {
443 ¤t_dir.0[..]
444 };
445
446 let errors: Vec<Error> = response
447 .errors
448 .into_iter()
449 .map(|error| {
450 let path = error
451 .path
452 .as_ref()
453 .map(|path| {
454 Path::from_iter(current_slice.iter().chain(path.iter()).cloned())
455 })
456 .unwrap_or_else(|| current_dir.clone());
457
458 Error::builder()
459 .locations(error.locations.clone())
460 .path(path)
461 .message(error.message.clone())
462 .and_extension_code(error.extension_code())
463 .extensions(error.extensions.clone())
464 .apollo_id(error.apollo_id())
465 .build()
466 })
467 .collect();
468 let mut data = response.data.unwrap_or_default();
469 rewrites::apply_rewrites(schema, &mut data, &self.output_rewrites);
470 (Value::from_path(current_dir, data), errors)
471 }
472 }
473
474 #[cfg(test)]
475 pub(crate) fn service_name(&self) -> &str {
476 &self.service_name
477 }
478
479 pub(crate) fn operation_kind(&self) -> &OperationKind {
480 &self.operation_kind
481 }
482
483 pub(crate) fn init_parsed_operation(
484 &mut self,
485 subgraph_schemas: &SubgraphSchemas,
486 ) -> Result<(), ValidationErrors> {
487 let schema = &subgraph_schemas[self.service_name.as_ref()];
488 self.operation.init_parsed(&schema.schema)?;
489 Ok(())
490 }
491
492 pub(crate) fn init_parsed_operation_and_hash_subquery(
493 &mut self,
494 subgraph_schemas: &SubgraphSchemas,
495 ) -> Result<(), ValidationErrors> {
496 let schema = &subgraph_schemas[self.service_name.as_ref()];
497 self.operation.init_parsed(&schema.schema)?;
498 self.schema_aware_hash = Arc::new(schema.hash.operation_hash(
499 self.operation.as_serialized(),
500 self.operation_name.as_deref(),
501 ));
502 Ok(())
503 }
504
505 pub(crate) fn extract_authorization_metadata(
506 &mut self,
507 schema: &Valid<apollo_compiler::Schema>,
508 global_authorisation_cache_key: &CacheKeyMetadata,
509 ) {
510 let doc = ExecutableDocument::parse(
511 schema,
512 self.operation.as_serialized().to_string(),
513 "query.graphql",
514 )
515 .unwrap_or_else(|invalid| invalid.partial);
517 let subgraph_query_cache_key = AuthorizationPlugin::generate_cache_metadata(
518 &doc,
519 self.operation_name.as_deref(),
520 schema,
521 !self.requires.is_empty(),
522 );
523
524 self.authorization = Arc::new(AuthorizationPlugin::intersect_cache_keys_subgraph(
527 global_authorisation_cache_key,
528 &subgraph_query_cache_key,
529 ));
530 }
531}