1use std::fmt::Display;
2use std::sync::Arc;
3
4use apollo_compiler::ExecutableDocument;
5use apollo_compiler::ast;
6use apollo_compiler::collections::HashMap;
7use apollo_compiler::validation::Valid;
8use indexmap::IndexSet;
9use serde::Deserialize;
10use serde::Serialize;
11use tower::ServiceExt;
12use tracing::Instrument;
13use tracing::instrument;
14
15use super::execution::ExecutionParameters;
16use super::rewrites;
17use super::selection::Selection;
18use super::selection::execute_selection_set;
19use super::subgraph_context::ContextualArguments;
20use super::subgraph_context::SubgraphContext;
21use super::subgraph_context::build_operation_with_aliasing;
22use crate::error::Error;
23use crate::error::FetchError;
24use crate::error::ValidationErrors;
25use crate::graphql;
26use crate::graphql::Request;
27use crate::http_ext;
28use crate::json_ext;
29use crate::json_ext::Object;
30use crate::json_ext::Path;
31use crate::json_ext::Value;
32use crate::json_ext::ValueExt;
33use crate::plugins::authorization::AuthorizationPlugin;
34use crate::plugins::authorization::CacheKeyMetadata;
35use crate::services::SubgraphRequest;
36use crate::spec::QueryHash;
37use crate::spec::Schema;
38use crate::spec::SchemaHash;
39
40#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Hash, Deserialize, Serialize)]
42#[serde(rename_all = "camelCase")]
43#[non_exhaustive]
44#[cfg_attr(test, derive(schemars::JsonSchema))]
45pub enum OperationKind {
46 #[default]
47 Query,
48 Mutation,
49 Subscription,
50}
51
52impl Display for OperationKind {
53 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54 write!(f, "{}", self.default_type_name())
55 }
56}
57
58impl OperationKind {
59 pub(crate) const fn default_type_name(&self) -> &'static str {
60 match self {
61 OperationKind::Query => "Query",
62 OperationKind::Mutation => "Mutation",
63 OperationKind::Subscription => "Subscription",
64 }
65 }
66
67 pub(crate) const fn as_apollo_operation_type(&self) -> &'static str {
69 match self {
70 OperationKind::Query => "query",
71 OperationKind::Mutation => "mutation",
72 OperationKind::Subscription => "subscription",
73 }
74 }
75}
76
77impl From<OperationKind> for ast::OperationType {
78 fn from(value: OperationKind) -> Self {
79 match value {
80 OperationKind::Query => ast::OperationType::Query,
81 OperationKind::Mutation => ast::OperationType::Mutation,
82 OperationKind::Subscription => ast::OperationType::Subscription,
83 }
84 }
85}
86
87impl From<ast::OperationType> for OperationKind {
88 fn from(value: ast::OperationType) -> Self {
89 match value {
90 ast::OperationType::Query => OperationKind::Query,
91 ast::OperationType::Mutation => OperationKind::Mutation,
92 ast::OperationType::Subscription => OperationKind::Subscription,
93 }
94 }
95}
96
97pub(crate) type SubgraphSchemas = HashMap<String, SubgraphSchema>;
98
99pub(crate) struct SubgraphSchema {
100 pub(crate) schema: Arc<Valid<apollo_compiler::Schema>>,
101 pub(crate) hash: SchemaHash,
103}
104
105impl SubgraphSchema {
106 pub(crate) fn new(schema: Valid<apollo_compiler::Schema>) -> Self {
107 let sdl = schema.serialize().no_indent().to_string();
108 Self {
109 schema: Arc::new(schema),
110 hash: SchemaHash::new(&sdl),
111 }
112 }
113}
114
115#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
117#[serde(rename_all = "camelCase")]
118pub(crate) struct FetchNode {
119 pub(crate) service_name: Arc<str>,
121
122 #[serde(skip_serializing_if = "Vec::is_empty")]
124 #[serde(default)]
125 pub(crate) requires: Vec<Selection>,
126
127 pub(crate) variable_usages: Vec<Arc<str>>,
129
130 pub(crate) operation: SubgraphOperation,
132
133 pub(crate) operation_name: Option<Arc<str>>,
135
136 pub(crate) operation_kind: OperationKind,
138
139 pub(crate) id: Option<String>,
141
142 pub(crate) input_rewrites: Option<Vec<rewrites::DataRewrite>>,
144
145 pub(crate) output_rewrites: Option<Vec<rewrites::DataRewrite>>,
147
148 pub(crate) context_rewrites: Option<Vec<rewrites::DataRewrite>>,
150
151 #[serde(default)]
154 pub(crate) schema_aware_hash: Arc<QueryHash>,
155
156 #[serde(default)]
158 pub(crate) authorization: Arc<CacheKeyMetadata>,
159}
160
161#[derive(Clone)]
162pub(crate) struct SubgraphOperation {
163 serialized: String,
164 parsed: Option<Arc<Valid<ExecutableDocument>>>,
167}
168
169impl SubgraphOperation {
170 pub(crate) fn from_string(serialized: impl Into<String>) -> Self {
171 Self {
172 serialized: serialized.into(),
173 parsed: None,
174 }
175 }
176
177 pub(crate) fn from_parsed(parsed: impl Into<Arc<Valid<ExecutableDocument>>>) -> Self {
178 let parsed = parsed.into();
179 Self {
180 serialized: parsed.serialize().no_indent().to_string(),
181 parsed: Some(parsed),
182 }
183 }
184
185 pub(crate) fn as_serialized(&self) -> &str {
186 &self.serialized
187 }
188
189 pub(crate) fn init_parsed(
190 &mut self,
191 subgraph_schema: &Valid<apollo_compiler::Schema>,
192 ) -> Result<&Arc<Valid<ExecutableDocument>>, ValidationErrors> {
193 match &mut self.parsed {
194 Some(parsed) => Ok(parsed),
195 option => {
196 let parsed = Arc::new(ExecutableDocument::parse_and_validate(
197 subgraph_schema,
198 &self.serialized,
199 "operation.graphql",
200 )?);
201 Ok(option.insert(parsed))
202 }
203 }
204 }
205
206 pub(crate) fn as_parsed(
207 &self,
208 ) -> Result<&Arc<Valid<ExecutableDocument>>, SubgraphOperationNotInitialized> {
209 self.parsed.as_ref().ok_or(SubgraphOperationNotInitialized)
210 }
211}
212
213#[derive(Debug, displaydoc::Display, thiserror::Error)]
215pub(crate) struct SubgraphOperationNotInitialized;
216
217impl SubgraphOperationNotInitialized {
218 pub(crate) fn into_graphql_errors(self) -> Vec<Error> {
219 vec![
220 graphql::Error::builder()
221 .extension_code(self.code())
222 .message(self.to_string())
223 .build(),
224 ]
225 }
226
227 pub(crate) fn code(&self) -> &'static str {
228 "SUBGRAPH_OPERATION_NOT_INITIALIZED"
229 }
230}
231
232impl Serialize for SubgraphOperation {
233 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
234 where
235 S: serde::Serializer,
236 {
237 self.as_serialized().serialize(serializer)
238 }
239}
240
241impl<'de> Deserialize<'de> for SubgraphOperation {
242 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
243 where
244 D: serde::Deserializer<'de>,
245 {
246 Ok(Self::from_string(String::deserialize(deserializer)?))
247 }
248}
249
250impl PartialEq for SubgraphOperation {
251 fn eq(&self, other: &Self) -> bool {
252 self.as_serialized() == other.as_serialized()
253 }
254}
255
256impl std::fmt::Debug for SubgraphOperation {
257 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
258 std::fmt::Debug::fmt(self.as_serialized(), f)
259 }
260}
261
262impl std::fmt::Display for SubgraphOperation {
263 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
264 std::fmt::Display::fmt(self.as_serialized(), f)
265 }
266}
267
268pub(crate) struct Variables {
269 pub(crate) variables: Object,
270 pub(crate) inverted_paths: Vec<Vec<Path>>,
271 pub(crate) contextual_arguments: Option<ContextualArguments>,
272}
273
274impl Variables {
275 #[instrument(skip_all, level = "debug", name = "make_variables")]
276 #[allow(clippy::too_many_arguments)]
277 pub(super) fn new(
278 requires: &[Selection],
279 variable_usages: &[Arc<str>],
280 data: &Value,
281 current_dir: &Path,
282 request: &Arc<http::Request<Request>>,
283 schema: &Schema,
284 input_rewrites: &Option<Vec<rewrites::DataRewrite>>,
285 context_rewrites: &Option<Vec<rewrites::DataRewrite>>,
286 ) -> Option<Variables> {
287 let body = request.body();
288 let mut subgraph_context = SubgraphContext::new(data, schema, context_rewrites);
289 if !requires.is_empty() {
290 let mut variables = Object::with_capacity(1 + variable_usages.len());
291
292 variables.extend(variable_usages.iter().filter_map(|key| {
293 body.variables
294 .get_key_value(key.as_ref())
295 .map(|(variable_key, value)| (variable_key.clone(), value.clone()))
296 }));
297
298 let mut inverted_paths: Vec<Vec<Path>> = Vec::new();
299 let mut values: IndexSet<Value> = IndexSet::default();
300 data.select_values_and_paths(schema, current_dir, |path, value| {
301 if let Some(context) = subgraph_context.as_mut() {
303 context.execute_on_path(path);
304 }
305
306 let mut value = execute_selection_set(value, requires, schema, None);
307 if value.as_object().map(|o| !o.is_empty()).unwrap_or(false) {
308 rewrites::apply_rewrites(schema, &mut value, input_rewrites);
309 match values.get_index_of(&value) {
310 Some(index) => {
311 inverted_paths[index].push(path.clone());
312 }
313 None => {
314 inverted_paths.push(vec![path.clone()]);
315 values.insert(value);
316 debug_assert!(inverted_paths.len() == values.len());
317 }
318 }
319 }
320 });
321
322 if values.is_empty() {
323 return None;
324 }
325
326 let representations = Value::Array(Vec::from_iter(values));
327 let contextual_arguments = match subgraph_context.as_mut() {
328 Some(context) => context.add_variables_and_get_args(&mut variables),
329 None => None,
330 };
331
332 variables.insert("representations", representations);
333 Some(Variables {
334 variables,
335 inverted_paths,
336 contextual_arguments,
337 })
338 } else {
339 if !current_dir.is_empty()
345 && data
346 .get_path(schema, current_dir)
347 .map(|value| value.is_null())
348 .unwrap_or(true)
349 {
350 return None;
351 }
352
353 Some(Variables {
354 variables: variable_usages
355 .iter()
356 .filter_map(|key| {
357 body.variables
358 .get_key_value(key.as_ref())
359 .map(|(variable_key, value)| (variable_key.clone(), value.clone()))
360 })
361 .collect::<Object>(),
362 inverted_paths: Vec::new(),
363 contextual_arguments: None,
364 })
365 }
366 }
367}
368
369impl FetchNode {
370 #[allow(clippy::too_many_arguments)]
371 pub(crate) async fn fetch_node<'a>(
372 &'a self,
373 parameters: &'a ExecutionParameters<'a>,
374 data: &'a Value,
375 current_dir: &'a Path,
376 ) -> (Value, Vec<Error>) {
377 let FetchNode {
378 operation,
379 operation_kind,
380 operation_name,
381 service_name,
382 ..
383 } = self;
384
385 let Variables {
386 variables,
387 inverted_paths: paths,
388 contextual_arguments,
389 } = match Variables::new(
390 &self.requires,
391 &self.variable_usages,
392 data,
393 current_dir,
394 parameters.supergraph_request,
396 parameters.schema,
397 &self.input_rewrites,
398 &self.context_rewrites,
399 ) {
400 Some(variables) => variables,
401 None => {
402 return (Value::Object(Object::default()), Vec::new());
403 }
404 };
405
406 let alias_query_string; let aliased_operation = if let Some(ctx_arg) = contextual_arguments {
408 if let Some(subgraph_schema) =
409 parameters.subgraph_schemas.get(&service_name.to_string())
410 {
411 match build_operation_with_aliasing(operation, &ctx_arg, &subgraph_schema.schema) {
412 Ok(op) => {
413 alias_query_string = op.serialize().no_indent().to_string();
414 alias_query_string.as_str()
415 }
416 Err(errors) => {
417 tracing::debug!(
418 "couldn't generate a valid executable document? {:?}",
419 errors
420 );
421 operation.as_serialized()
422 }
423 }
424 } else {
425 tracing::debug!(
426 "couldn't find a subgraph schema for service {:?}",
427 &service_name
428 );
429 operation.as_serialized()
430 }
431 } else {
432 operation.as_serialized()
433 };
434
435 let mut subgraph_request = SubgraphRequest::builder()
436 .supergraph_request(parameters.supergraph_request.clone())
437 .subgraph_request(
438 http_ext::Request::builder()
439 .method(http::Method::POST)
440 .uri(
441 parameters
442 .schema
443 .subgraph_url(service_name)
444 .unwrap_or_else(|| {
445 panic!(
446 "schema uri for subgraph '{service_name}' should already have been checked"
447 )
448 })
449 .clone(),
450 )
451 .body(
452 Request::builder()
453 .query(aliased_operation)
454 .and_operation_name(operation_name.as_ref().map(|n| n.to_string()))
455 .variables(variables.clone())
456 .build(),
457 )
458 .build()
459 .expect("it won't fail because the url is correct and already checked; qed"),
460 )
461 .subgraph_name(self.service_name.to_string())
462 .operation_kind(*operation_kind)
463 .context(parameters.context.clone())
464 .build();
465 subgraph_request.query_hash = self.schema_aware_hash.clone();
466 subgraph_request.authorization = self.authorization.clone();
467
468 let service = parameters
469 .service_factory
470 .create(service_name)
471 .expect("we already checked that the service exists during planning; qed");
472
473 let (_parts, response) = match service
474 .oneshot(subgraph_request)
475 .instrument(tracing::trace_span!("subfetch_stream"))
476 .await
477 .map_err(|e| match e.downcast::<FetchError>() {
482 Ok(inner) => match *inner {
483 FetchError::SubrequestHttpError { .. } => *inner,
484 _ => FetchError::SubrequestHttpError {
485 status_code: None,
486 service: service_name.to_string(),
487 reason: inner.to_string(),
488 },
489 },
490 Err(e) => FetchError::SubrequestHttpError {
491 status_code: None,
492 service: service_name.to_string(),
493 reason: e.to_string(),
494 },
495 }) {
496 Err(e) => {
497 return (
498 Value::default(),
499 vec![e.to_graphql_error(Some(current_dir.to_owned()))],
500 );
501 }
502 Ok(res) => res.response.into_parts(),
503 };
504
505 super::log::trace_subfetch(
506 service_name,
507 operation.as_serialized(),
508 &variables,
509 &response,
510 );
511
512 if !response.is_primary() {
513 return (
514 Value::default(),
515 vec![
516 FetchError::SubrequestUnexpectedPatchResponse {
517 service: service_name.to_string(),
518 }
519 .to_graphql_error(Some(current_dir.to_owned())),
520 ],
521 );
522 }
523
524 let (value, errors) =
525 self.response_at_path(parameters.schema, current_dir, paths, response);
526 if let Some(id) = &self.id {
527 if let Some(sender) = parameters.deferred_fetches.get(id.as_str()) {
528 u64_counter!(
529 "apollo.router.operations.defer.fetch",
530 "Number of deferred responses fetched from subgraphs",
531 1
532 );
533 if let Err(e) = sender.clone().send((value.clone(), errors.clone())) {
534 tracing::error!(
535 "error sending fetch result at path {} and id {:?} for deferred response building: {}",
536 current_dir,
537 self.id,
538 e
539 );
540 }
541 }
542 }
543 (value, errors)
544 }
545
546 #[instrument(skip_all, level = "debug", name = "response_insert")]
547 fn response_at_path<'a>(
548 &'a self,
549 schema: &Schema,
550 current_dir: &'a Path,
551 inverted_paths: Vec<Vec<Path>>,
552 response: graphql::Response,
553 ) -> (Value, Vec<Error>) {
554 if !self.requires.is_empty() {
555 let entities_path = Path(vec![json_ext::PathElement::Key(
556 "_entities".to_string(),
557 None,
558 )]);
559
560 let mut errors: Vec<Error> = vec![];
561 for mut error in response.errors {
562 error.locations = Vec::new();
565
566 if let Some(ref path) = error.path {
568 if path.starts_with(&entities_path) {
569 match path.0.get(1) {
572 Some(json_ext::PathElement::Index(i)) => {
573 for values_path in
574 inverted_paths.get(*i).iter().flat_map(|v| v.iter())
575 {
576 errors.push(Error {
577 locations: error.locations.clone(),
578 path: Some(Path::from_iter(
581 values_path.0.iter().chain(&path.0[2..]).cloned(),
582 )),
583 message: error.message.clone(),
584 extensions: error.extensions.clone(),
585 })
586 }
587 }
588 _ => {
589 error.path = Some(current_dir.clone());
590 errors.push(error)
591 }
592 }
593 } else {
594 error.path = Some(current_dir.clone());
595 errors.push(error);
596 }
597 } else {
598 error.path = Some(current_dir.clone());
599 errors.push(error);
600 }
601 }
602
603 if let Some(Value::Object(mut map)) = response.data {
606 if let Some(entities) = map.remove("_entities") {
607 tracing::trace!("received entities: {:?}", &entities);
608
609 if let Value::Array(array) = entities {
610 let mut value = Value::default();
611
612 for (index, mut entity) in array.into_iter().enumerate() {
613 rewrites::apply_rewrites(schema, &mut entity, &self.output_rewrites);
614
615 if let Some(paths) = inverted_paths.get(index) {
616 if paths.len() > 1 {
617 for path in &paths[1..] {
618 let _ = value.insert(path, entity.clone());
619 }
620 }
621
622 if let Some(path) = paths.first() {
623 let _ = value.insert(path, entity);
624 }
625 }
626 }
627 return (value, errors);
628 }
629 }
630 }
631
632 if errors.is_empty() {
637 tracing::warn!(
638 "Subgraph response from '{}' was missing key `_entities` and had no errors. This is likely a bug in the subgraph.",
639 self.service_name
640 );
641 }
642
643 (Value::Null, errors)
644 } else {
645 let current_slice =
646 if matches!(current_dir.last(), Some(&json_ext::PathElement::Flatten(_))) {
647 ¤t_dir.0[..current_dir.0.len() - 1]
648 } else {
649 ¤t_dir.0[..]
650 };
651
652 let errors: Vec<Error> = response
653 .errors
654 .into_iter()
655 .map(|error| {
656 let path = error
657 .path
658 .as_ref()
659 .map(|path| {
660 Path::from_iter(current_slice.iter().chain(path.iter()).cloned())
661 })
662 .unwrap_or_else(|| current_dir.clone());
663
664 Error {
665 locations: error.locations,
666 path: Some(path),
667 message: error.message,
668 extensions: error.extensions,
669 }
670 })
671 .collect();
672 let mut data = response.data.unwrap_or_default();
673 rewrites::apply_rewrites(schema, &mut data, &self.output_rewrites);
674 (Value::from_path(current_dir, data), errors)
675 }
676 }
677
678 #[cfg(test)]
679 pub(crate) fn service_name(&self) -> &str {
680 &self.service_name
681 }
682
683 pub(crate) fn operation_kind(&self) -> &OperationKind {
684 &self.operation_kind
685 }
686
687 pub(crate) fn init_parsed_operation(
688 &mut self,
689 subgraph_schemas: &SubgraphSchemas,
690 ) -> Result<(), ValidationErrors> {
691 let schema = &subgraph_schemas[self.service_name.as_ref()];
692 self.operation.init_parsed(&schema.schema)?;
693 Ok(())
694 }
695
696 pub(crate) fn init_parsed_operation_and_hash_subquery(
697 &mut self,
698 subgraph_schemas: &SubgraphSchemas,
699 ) -> Result<(), ValidationErrors> {
700 let schema = &subgraph_schemas[self.service_name.as_ref()];
701 self.operation.init_parsed(&schema.schema)?;
702 self.schema_aware_hash = Arc::new(schema.hash.operation_hash(
703 self.operation.as_serialized(),
704 self.operation_name.as_deref(),
705 ));
706 Ok(())
707 }
708
709 pub(crate) fn extract_authorization_metadata(
710 &mut self,
711 schema: &Valid<apollo_compiler::Schema>,
712 global_authorisation_cache_key: &CacheKeyMetadata,
713 ) {
714 let doc = ExecutableDocument::parse(
715 schema,
716 self.operation.as_serialized().to_string(),
717 "query.graphql",
718 )
719 .unwrap_or_else(|invalid| invalid.partial);
721 let subgraph_query_cache_key = AuthorizationPlugin::generate_cache_metadata(
722 &doc,
723 self.operation_name.as_deref(),
724 schema,
725 !self.requires.is_empty(),
726 );
727
728 self.authorization = Arc::new(AuthorizationPlugin::intersect_cache_keys_subgraph(
731 global_authorisation_cache_key,
732 &subgraph_query_cache_key,
733 ));
734 }
735}