Skip to main content

apollo_router/query_planner/
fetch.rs

1use std::fmt::Display;
2use std::sync::Arc;
3
4use apollo_compiler::ExecutableDocument;
5use apollo_compiler::ast;
6use apollo_compiler::collections::HashMap;
7use apollo_compiler::validation::Valid;
8use indexmap::IndexSet;
9use serde::Deserialize;
10use serde::Serialize;
11use tower::ServiceExt;
12use tracing::Instrument;
13use tracing::instrument;
14
15use super::execution::ExecutionParameters;
16use super::rewrites;
17use super::selection::Selection;
18use super::selection::execute_selection_set;
19use super::subgraph_context::ContextualArguments;
20use super::subgraph_context::SubgraphContext;
21use super::subgraph_context::build_operation_with_aliasing;
22use crate::error::Error;
23use crate::error::FetchError;
24use crate::error::ValidationErrors;
25use crate::graphql;
26use crate::graphql::Request;
27use crate::http_ext;
28use crate::json_ext;
29use crate::json_ext::Object;
30use crate::json_ext::Path;
31use crate::json_ext::Value;
32use crate::json_ext::ValueExt;
33use crate::plugins::authorization::AuthorizationPlugin;
34use crate::plugins::authorization::CacheKeyMetadata;
35use crate::services::SubgraphRequest;
36use crate::spec::QueryHash;
37use crate::spec::Schema;
38use crate::spec::SchemaHash;
39
40/// GraphQL operation type.
41#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Hash, Deserialize, Serialize)]
42#[serde(rename_all = "camelCase")]
43#[non_exhaustive]
44#[cfg_attr(test, derive(schemars::JsonSchema))]
45pub enum OperationKind {
46    #[default]
47    Query,
48    Mutation,
49    Subscription,
50}
51
52impl Display for OperationKind {
53    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54        write!(f, "{}", self.default_type_name())
55    }
56}
57
58impl OperationKind {
59    pub(crate) const fn default_type_name(&self) -> &'static str {
60        match self {
61            OperationKind::Query => "Query",
62            OperationKind::Mutation => "Mutation",
63            OperationKind::Subscription => "Subscription",
64        }
65    }
66
67    /// Only for apollo studio exporter
68    pub(crate) const fn as_apollo_operation_type(&self) -> &'static str {
69        match self {
70            OperationKind::Query => "query",
71            OperationKind::Mutation => "mutation",
72            OperationKind::Subscription => "subscription",
73        }
74    }
75}
76
77impl From<OperationKind> for ast::OperationType {
78    fn from(value: OperationKind) -> Self {
79        match value {
80            OperationKind::Query => ast::OperationType::Query,
81            OperationKind::Mutation => ast::OperationType::Mutation,
82            OperationKind::Subscription => ast::OperationType::Subscription,
83        }
84    }
85}
86
87impl From<ast::OperationType> for OperationKind {
88    fn from(value: ast::OperationType) -> Self {
89        match value {
90            ast::OperationType::Query => OperationKind::Query,
91            ast::OperationType::Mutation => OperationKind::Mutation,
92            ast::OperationType::Subscription => OperationKind::Subscription,
93        }
94    }
95}
96
97pub(crate) type SubgraphSchemas = HashMap<String, SubgraphSchema>;
98
99pub(crate) struct SubgraphSchema {
100    pub(crate) schema: Arc<Valid<apollo_compiler::Schema>>,
101    // TODO: Ideally should have separate nominal type for subgraph's schema hash
102    pub(crate) hash: SchemaHash,
103}
104
105impl SubgraphSchema {
106    pub(crate) fn new(schema: Valid<apollo_compiler::Schema>) -> Self {
107        let sdl = schema.serialize().no_indent().to_string();
108        Self {
109            schema: Arc::new(schema),
110            hash: SchemaHash::new(&sdl),
111        }
112    }
113}
114
115/// A fetch node.
116#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
117#[serde(rename_all = "camelCase")]
118pub(crate) struct FetchNode {
119    /// The name of the service or subgraph that the fetch is querying.
120    pub(crate) service_name: Arc<str>,
121
122    /// The data that is required for the subgraph fetch.
123    #[serde(skip_serializing_if = "Vec::is_empty")]
124    #[serde(default)]
125    pub(crate) requires: Vec<Selection>,
126
127    /// The variables that are used for the subgraph fetch.
128    pub(crate) variable_usages: Vec<Arc<str>>,
129
130    /// The GraphQL subquery that is used for the fetch.
131    pub(crate) operation: SubgraphOperation,
132
133    /// The GraphQL subquery operation name.
134    pub(crate) operation_name: Option<Arc<str>>,
135
136    /// The GraphQL operation kind that is used for the fetch.
137    pub(crate) operation_kind: OperationKind,
138
139    /// Optional id used by Deferred nodes
140    pub(crate) id: Option<String>,
141
142    // Optionally describes a number of "rewrites" that query plan executors should apply to the data that is sent as input of this fetch.
143    pub(crate) input_rewrites: Option<Vec<rewrites::DataRewrite>>,
144
145    // Optionally describes a number of "rewrites" to apply to the data that received from a fetch (and before it is applied to the current in-memory results).
146    pub(crate) output_rewrites: Option<Vec<rewrites::DataRewrite>>,
147
148    // Optionally describes a number of "rewrites" to apply to the data that has already been received further up the tree
149    pub(crate) context_rewrites: Option<Vec<rewrites::DataRewrite>>,
150
151    // hash for the query and relevant parts of the schema. if two different schemas provide the exact same types, fields and directives
152    // affecting the query, then they will have the same hash
153    #[serde(default)]
154    pub(crate) schema_aware_hash: Arc<QueryHash>,
155
156    // authorization metadata for the subgraph query
157    #[serde(default)]
158    pub(crate) authorization: Arc<CacheKeyMetadata>,
159}
160
161#[derive(Clone)]
162pub(crate) struct SubgraphOperation {
163    serialized: String,
164    /// Ideally this would be always present, but we don’t have access to the subgraph schemas
165    /// during `Deserialize`.
166    parsed: Option<Arc<Valid<ExecutableDocument>>>,
167}
168
169impl SubgraphOperation {
170    pub(crate) fn from_string(serialized: impl Into<String>) -> Self {
171        Self {
172            serialized: serialized.into(),
173            parsed: None,
174        }
175    }
176
177    pub(crate) fn from_parsed(parsed: impl Into<Arc<Valid<ExecutableDocument>>>) -> Self {
178        let parsed = parsed.into();
179        Self {
180            serialized: parsed.serialize().no_indent().to_string(),
181            parsed: Some(parsed),
182        }
183    }
184
185    pub(crate) fn as_serialized(&self) -> &str {
186        &self.serialized
187    }
188
189    pub(crate) fn init_parsed(
190        &mut self,
191        subgraph_schema: &Valid<apollo_compiler::Schema>,
192    ) -> Result<&Arc<Valid<ExecutableDocument>>, ValidationErrors> {
193        match &mut self.parsed {
194            Some(parsed) => Ok(parsed),
195            option => {
196                let parsed = Arc::new(ExecutableDocument::parse_and_validate(
197                    subgraph_schema,
198                    &self.serialized,
199                    "operation.graphql",
200                )?);
201                Ok(option.insert(parsed))
202            }
203        }
204    }
205
206    pub(crate) fn as_parsed(
207        &self,
208    ) -> Result<&Arc<Valid<ExecutableDocument>>, SubgraphOperationNotInitialized> {
209        self.parsed.as_ref().ok_or(SubgraphOperationNotInitialized)
210    }
211}
212
213/// Failed to call `SubgraphOperation::init_parsed` after creating a query plan
214#[derive(Debug, displaydoc::Display, thiserror::Error)]
215pub(crate) struct SubgraphOperationNotInitialized;
216
217impl SubgraphOperationNotInitialized {
218    pub(crate) fn into_graphql_errors(self) -> Vec<Error> {
219        vec![
220            graphql::Error::builder()
221                .extension_code(self.code())
222                .message(self.to_string())
223                .build(),
224        ]
225    }
226
227    pub(crate) fn code(&self) -> &'static str {
228        "SUBGRAPH_OPERATION_NOT_INITIALIZED"
229    }
230}
231
232impl Serialize for SubgraphOperation {
233    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
234    where
235        S: serde::Serializer,
236    {
237        self.as_serialized().serialize(serializer)
238    }
239}
240
241impl<'de> Deserialize<'de> for SubgraphOperation {
242    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
243    where
244        D: serde::Deserializer<'de>,
245    {
246        Ok(Self::from_string(String::deserialize(deserializer)?))
247    }
248}
249
250impl PartialEq for SubgraphOperation {
251    fn eq(&self, other: &Self) -> bool {
252        self.as_serialized() == other.as_serialized()
253    }
254}
255
256impl std::fmt::Debug for SubgraphOperation {
257    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
258        std::fmt::Debug::fmt(self.as_serialized(), f)
259    }
260}
261
262impl std::fmt::Display for SubgraphOperation {
263    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
264        std::fmt::Display::fmt(self.as_serialized(), f)
265    }
266}
267
268pub(crate) struct Variables {
269    pub(crate) variables: Object,
270    pub(crate) inverted_paths: Vec<Vec<Path>>,
271    pub(crate) contextual_arguments: Option<ContextualArguments>,
272}
273
274impl Variables {
275    #[instrument(skip_all, level = "debug", name = "make_variables")]
276    #[allow(clippy::too_many_arguments)]
277    pub(super) fn new(
278        requires: &[Selection],
279        variable_usages: &[Arc<str>],
280        data: &Value,
281        current_dir: &Path,
282        request: &Arc<http::Request<Request>>,
283        schema: &Schema,
284        input_rewrites: &Option<Vec<rewrites::DataRewrite>>,
285        context_rewrites: &Option<Vec<rewrites::DataRewrite>>,
286    ) -> Option<Variables> {
287        let body = request.body();
288        let mut subgraph_context = SubgraphContext::new(data, schema, context_rewrites);
289        if !requires.is_empty() {
290            let mut variables = Object::with_capacity(1 + variable_usages.len());
291
292            variables.extend(variable_usages.iter().filter_map(|key| {
293                body.variables
294                    .get_key_value(key.as_ref())
295                    .map(|(variable_key, value)| (variable_key.clone(), value.clone()))
296            }));
297
298            let mut inverted_paths: Vec<Vec<Path>> = Vec::new();
299            let mut values: IndexSet<Value> = IndexSet::default();
300            data.select_values_and_paths(schema, current_dir, |path, value| {
301                // first get contextual values that are required
302                if let Some(context) = subgraph_context.as_mut() {
303                    context.execute_on_path(path);
304                }
305
306                let mut value = execute_selection_set(value, requires, schema, None);
307                if value.as_object().map(|o| !o.is_empty()).unwrap_or(false) {
308                    rewrites::apply_rewrites(schema, &mut value, input_rewrites);
309                    match values.get_index_of(&value) {
310                        Some(index) => {
311                            inverted_paths[index].push(path.clone());
312                        }
313                        None => {
314                            inverted_paths.push(vec![path.clone()]);
315                            values.insert(value);
316                            debug_assert!(inverted_paths.len() == values.len());
317                        }
318                    }
319                }
320            });
321
322            if values.is_empty() {
323                return None;
324            }
325
326            let representations = Value::Array(Vec::from_iter(values));
327            let contextual_arguments = match subgraph_context.as_mut() {
328                Some(context) => context.add_variables_and_get_args(&mut variables),
329                None => None,
330            };
331
332            variables.insert("representations", representations);
333            Some(Variables {
334                variables,
335                inverted_paths,
336                contextual_arguments,
337            })
338        } else {
339            // with nested operations (Query or Mutation has an operation returning a Query or Mutation),
340            // when the first fetch fails, the query plan will still execute up until the second fetch,
341            // where `requires` is empty (not a federated fetch), the current dir is not emmpty (child of
342            // the previous operation field) and the data is null. In that case, we recognize that we
343            // should not perform the next fetch
344            if !current_dir.is_empty()
345                && data
346                    .get_path(schema, current_dir)
347                    .map(|value| value.is_null())
348                    .unwrap_or(true)
349            {
350                return None;
351            }
352
353            Some(Variables {
354                variables: variable_usages
355                    .iter()
356                    .filter_map(|key| {
357                        body.variables
358                            .get_key_value(key.as_ref())
359                            .map(|(variable_key, value)| (variable_key.clone(), value.clone()))
360                    })
361                    .collect::<Object>(),
362                inverted_paths: Vec::new(),
363                contextual_arguments: None,
364            })
365        }
366    }
367}
368
369impl FetchNode {
370    #[allow(clippy::too_many_arguments)]
371    pub(crate) async fn fetch_node<'a>(
372        &'a self,
373        parameters: &'a ExecutionParameters<'a>,
374        data: &'a Value,
375        current_dir: &'a Path,
376    ) -> (Value, Vec<Error>) {
377        let FetchNode {
378            operation,
379            operation_kind,
380            operation_name,
381            service_name,
382            ..
383        } = self;
384
385        let Variables {
386            variables,
387            inverted_paths: paths,
388            contextual_arguments,
389        } = match Variables::new(
390            &self.requires,
391            &self.variable_usages,
392            data,
393            current_dir,
394            // Needs the original request here
395            parameters.supergraph_request,
396            parameters.schema,
397            &self.input_rewrites,
398            &self.context_rewrites,
399        ) {
400            Some(variables) => variables,
401            None => {
402                return (Value::Object(Object::default()), Vec::new());
403            }
404        };
405
406        let alias_query_string; // this exists outside the if block to allow the as_str() to be longer lived
407        let aliased_operation = if let Some(ctx_arg) = contextual_arguments {
408            if let Some(subgraph_schema) =
409                parameters.subgraph_schemas.get(&service_name.to_string())
410            {
411                match build_operation_with_aliasing(operation, &ctx_arg, &subgraph_schema.schema) {
412                    Ok(op) => {
413                        alias_query_string = op.serialize().no_indent().to_string();
414                        alias_query_string.as_str()
415                    }
416                    Err(errors) => {
417                        tracing::debug!(
418                            "couldn't generate a valid executable document? {:?}",
419                            errors
420                        );
421                        operation.as_serialized()
422                    }
423                }
424            } else {
425                tracing::debug!(
426                    "couldn't find a subgraph schema for service {:?}",
427                    &service_name
428                );
429                operation.as_serialized()
430            }
431        } else {
432            operation.as_serialized()
433        };
434
435        let mut subgraph_request = SubgraphRequest::builder()
436            .supergraph_request(parameters.supergraph_request.clone())
437            .subgraph_request(
438                http_ext::Request::builder()
439                    .method(http::Method::POST)
440                    .uri(
441                        parameters
442                            .schema
443                            .subgraph_url(service_name)
444                            .unwrap_or_else(|| {
445                                panic!(
446                                    "schema uri for subgraph '{service_name}' should already have been checked"
447                                )
448                            })
449                            .clone(),
450                    )
451                    .body(
452                        Request::builder()
453                            .query(aliased_operation)
454                            .and_operation_name(operation_name.as_ref().map(|n| n.to_string()))
455                            .variables(variables.clone())
456                            .build(),
457                    )
458                    .build()
459                    .expect("it won't fail because the url is correct and already checked; qed"),
460            )
461            .subgraph_name(self.service_name.to_string())
462            .operation_kind(*operation_kind)
463            .context(parameters.context.clone())
464            .build();
465        subgraph_request.query_hash = self.schema_aware_hash.clone();
466        subgraph_request.authorization = self.authorization.clone();
467
468        let service = parameters
469            .service_factory
470            .create(service_name)
471            .expect("we already checked that the service exists during planning; qed");
472
473        let (_parts, response) = match service
474            .oneshot(subgraph_request)
475            .instrument(tracing::trace_span!("subfetch_stream"))
476            .await
477            // TODO this is a problem since it restores details about failed service
478            // when errors have been redacted in the include_subgraph_errors module.
479            // Unfortunately, not easy to fix here, because at this point we don't
480            // know if we should be redacting errors for this subgraph...
481            .map_err(|e| match e.downcast::<FetchError>() {
482                Ok(inner) => match *inner {
483                    FetchError::SubrequestHttpError { .. } => *inner,
484                    _ => FetchError::SubrequestHttpError {
485                        status_code: None,
486                        service: service_name.to_string(),
487                        reason: inner.to_string(),
488                    },
489                },
490                Err(e) => FetchError::SubrequestHttpError {
491                    status_code: None,
492                    service: service_name.to_string(),
493                    reason: e.to_string(),
494                },
495            }) {
496            Err(e) => {
497                return (
498                    Value::default(),
499                    vec![e.to_graphql_error(Some(current_dir.to_owned()))],
500                );
501            }
502            Ok(res) => res.response.into_parts(),
503        };
504
505        super::log::trace_subfetch(
506            service_name,
507            operation.as_serialized(),
508            &variables,
509            &response,
510        );
511
512        if !response.is_primary() {
513            return (
514                Value::default(),
515                vec![
516                    FetchError::SubrequestUnexpectedPatchResponse {
517                        service: service_name.to_string(),
518                    }
519                    .to_graphql_error(Some(current_dir.to_owned())),
520                ],
521            );
522        }
523
524        let (value, errors) =
525            self.response_at_path(parameters.schema, current_dir, paths, response);
526        if let Some(id) = &self.id {
527            if let Some(sender) = parameters.deferred_fetches.get(id.as_str()) {
528                u64_counter!(
529                    "apollo.router.operations.defer.fetch",
530                    "Number of deferred responses fetched from subgraphs",
531                    1
532                );
533                if let Err(e) = sender.clone().send((value.clone(), errors.clone())) {
534                    tracing::error!(
535                        "error sending fetch result at path {} and id {:?} for deferred response building: {}",
536                        current_dir,
537                        self.id,
538                        e
539                    );
540                }
541            }
542        }
543        (value, errors)
544    }
545
546    #[instrument(skip_all, level = "debug", name = "response_insert")]
547    fn response_at_path<'a>(
548        &'a self,
549        schema: &Schema,
550        current_dir: &'a Path,
551        inverted_paths: Vec<Vec<Path>>,
552        response: graphql::Response,
553    ) -> (Value, Vec<Error>) {
554        if !self.requires.is_empty() {
555            let entities_path = Path(vec![json_ext::PathElement::Key(
556                "_entities".to_string(),
557                None,
558            )]);
559
560            let mut errors: Vec<Error> = vec![];
561            for mut error in response.errors {
562                // the locations correspond to the subgraph query and cannot be linked to locations
563                // in the client query, so we remove them
564                error.locations = Vec::new();
565
566                // errors with path should be updated to the path of the entity they target
567                if let Some(ref path) = error.path {
568                    if path.starts_with(&entities_path) {
569                        // the error's path has the format '/_entities/1/other' so we ignore the
570                        // first element and then get the index
571                        match path.0.get(1) {
572                            Some(json_ext::PathElement::Index(i)) => {
573                                for values_path in
574                                    inverted_paths.get(*i).iter().flat_map(|v| v.iter())
575                                {
576                                    errors.push(Error {
577                                        locations: error.locations.clone(),
578                                        // append to the entitiy's path the error's path without
579                                        //`_entities` and the index
580                                        path: Some(Path::from_iter(
581                                            values_path.0.iter().chain(&path.0[2..]).cloned(),
582                                        )),
583                                        message: error.message.clone(),
584                                        extensions: error.extensions.clone(),
585                                    })
586                                }
587                            }
588                            _ => {
589                                error.path = Some(current_dir.clone());
590                                errors.push(error)
591                            }
592                        }
593                    } else {
594                        error.path = Some(current_dir.clone());
595                        errors.push(error);
596                    }
597                } else {
598                    error.path = Some(current_dir.clone());
599                    errors.push(error);
600                }
601            }
602
603            // we have to nest conditions and do early returns here
604            // because we need to take ownership of the inner value
605            if let Some(Value::Object(mut map)) = response.data {
606                if let Some(entities) = map.remove("_entities") {
607                    tracing::trace!("received entities: {:?}", &entities);
608
609                    if let Value::Array(array) = entities {
610                        let mut value = Value::default();
611
612                        for (index, mut entity) in array.into_iter().enumerate() {
613                            rewrites::apply_rewrites(schema, &mut entity, &self.output_rewrites);
614
615                            if let Some(paths) = inverted_paths.get(index) {
616                                if paths.len() > 1 {
617                                    for path in &paths[1..] {
618                                        let _ = value.insert(path, entity.clone());
619                                    }
620                                }
621
622                                if let Some(path) = paths.first() {
623                                    let _ = value.insert(path, entity);
624                                }
625                            }
626                        }
627                        return (value, errors);
628                    }
629                }
630            }
631
632            // if we get here, it means that the response was missing the `_entities` key
633            // This can happen if the subgraph failed during query execution e.g. for permissions checks.
634            // In this case we should add an additional error because the subgraph should have returned an error that will be bubbled up to the client.
635            // However, if they have not then print a warning to the logs.
636            if errors.is_empty() {
637                tracing::warn!(
638                    "Subgraph response from '{}' was missing key `_entities` and had no errors. This is likely a bug in the subgraph.",
639                    self.service_name
640                );
641            }
642
643            (Value::Null, errors)
644        } else {
645            let current_slice =
646                if matches!(current_dir.last(), Some(&json_ext::PathElement::Flatten(_))) {
647                    &current_dir.0[..current_dir.0.len() - 1]
648                } else {
649                    &current_dir.0[..]
650                };
651
652            let errors: Vec<Error> = response
653                .errors
654                .into_iter()
655                .map(|error| {
656                    let path = error
657                        .path
658                        .as_ref()
659                        .map(|path| {
660                            Path::from_iter(current_slice.iter().chain(path.iter()).cloned())
661                        })
662                        .unwrap_or_else(|| current_dir.clone());
663
664                    Error {
665                        locations: error.locations,
666                        path: Some(path),
667                        message: error.message,
668                        extensions: error.extensions,
669                    }
670                })
671                .collect();
672            let mut data = response.data.unwrap_or_default();
673            rewrites::apply_rewrites(schema, &mut data, &self.output_rewrites);
674            (Value::from_path(current_dir, data), errors)
675        }
676    }
677
678    #[cfg(test)]
679    pub(crate) fn service_name(&self) -> &str {
680        &self.service_name
681    }
682
683    pub(crate) fn operation_kind(&self) -> &OperationKind {
684        &self.operation_kind
685    }
686
687    pub(crate) fn init_parsed_operation(
688        &mut self,
689        subgraph_schemas: &SubgraphSchemas,
690    ) -> Result<(), ValidationErrors> {
691        let schema = &subgraph_schemas[self.service_name.as_ref()];
692        self.operation.init_parsed(&schema.schema)?;
693        Ok(())
694    }
695
696    pub(crate) fn init_parsed_operation_and_hash_subquery(
697        &mut self,
698        subgraph_schemas: &SubgraphSchemas,
699    ) -> Result<(), ValidationErrors> {
700        let schema = &subgraph_schemas[self.service_name.as_ref()];
701        self.operation.init_parsed(&schema.schema)?;
702        self.schema_aware_hash = Arc::new(schema.hash.operation_hash(
703            self.operation.as_serialized(),
704            self.operation_name.as_deref(),
705        ));
706        Ok(())
707    }
708
709    pub(crate) fn extract_authorization_metadata(
710        &mut self,
711        schema: &Valid<apollo_compiler::Schema>,
712        global_authorisation_cache_key: &CacheKeyMetadata,
713    ) {
714        let doc = ExecutableDocument::parse(
715            schema,
716            self.operation.as_serialized().to_string(),
717            "query.graphql",
718        )
719        // Assume query planing creates a valid document: ignore parse errors
720        .unwrap_or_else(|invalid| invalid.partial);
721        let subgraph_query_cache_key = AuthorizationPlugin::generate_cache_metadata(
722            &doc,
723            self.operation_name.as_deref(),
724            schema,
725            !self.requires.is_empty(),
726        );
727
728        // we need to intersect the cache keys because the global key already takes into account
729        // the scopes and policies from the client request
730        self.authorization = Arc::new(AuthorizationPlugin::intersect_cache_keys_subgraph(
731            global_authorisation_cache_key,
732            &subgraph_query_cache_key,
733        ));
734    }
735}