apollo_router/query_planner/
fetch.rs

1use std::fmt::Display;
2use std::sync::Arc;
3
4use apollo_compiler::ExecutableDocument;
5use apollo_compiler::ast;
6use apollo_compiler::collections::HashMap;
7use apollo_compiler::validation::Valid;
8use apollo_federation::query_plan::requires_selection;
9use apollo_federation::query_plan::serializable_document::SerializableDocument;
10use indexmap::IndexSet;
11use serde::Deserialize;
12use serde::Serialize;
13use serde_json_bytes::ByteString;
14use serde_json_bytes::Map;
15use tokio::sync::broadcast::Sender;
16use tower::ServiceExt;
17use tracing::Instrument;
18use tracing::instrument;
19
20use super::rewrites;
21use super::selection::execute_selection_set;
22use super::subgraph_context::ContextualArguments;
23use super::subgraph_context::SubgraphContext;
24use crate::error::Error;
25use crate::error::FetchError;
26use crate::error::ValidationErrors;
27use crate::graphql;
28use crate::graphql::Request;
29use crate::json_ext;
30use crate::json_ext::Object;
31use crate::json_ext::Path;
32use crate::json_ext::Value;
33use crate::json_ext::ValueExt;
34use crate::plugins::authorization::AuthorizationPlugin;
35use crate::plugins::authorization::CacheKeyMetadata;
36use crate::services::SubgraphRequest;
37use crate::services::fetch::ErrorMapping;
38use crate::services::subgraph::BoxService;
39use crate::spec::QueryHash;
40use crate::spec::Schema;
41use crate::spec::SchemaHash;
42
43/// GraphQL operation type.
44#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Hash, Deserialize, Serialize)]
45#[serde(rename_all = "camelCase")]
46#[non_exhaustive]
47#[cfg_attr(test, derive(schemars::JsonSchema))]
48pub enum OperationKind {
49    #[default]
50    Query,
51    Mutation,
52    Subscription,
53}
54
55impl Display for OperationKind {
56    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57        write!(f, "{}", self.default_type_name())
58    }
59}
60
61impl OperationKind {
62    pub(crate) const fn default_type_name(&self) -> &'static str {
63        match self {
64            OperationKind::Query => "Query",
65            OperationKind::Mutation => "Mutation",
66            OperationKind::Subscription => "Subscription",
67        }
68    }
69
70    /// Only for apollo studio exporter
71    pub(crate) const fn as_apollo_operation_type(&self) -> &'static str {
72        match self {
73            OperationKind::Query => "query",
74            OperationKind::Mutation => "mutation",
75            OperationKind::Subscription => "subscription",
76        }
77    }
78}
79
80impl From<OperationKind> for ast::OperationType {
81    fn from(value: OperationKind) -> Self {
82        match value {
83            OperationKind::Query => ast::OperationType::Query,
84            OperationKind::Mutation => ast::OperationType::Mutation,
85            OperationKind::Subscription => ast::OperationType::Subscription,
86        }
87    }
88}
89
90impl From<ast::OperationType> for OperationKind {
91    fn from(value: ast::OperationType) -> Self {
92        match value {
93            ast::OperationType::Query => OperationKind::Query,
94            ast::OperationType::Mutation => OperationKind::Mutation,
95            ast::OperationType::Subscription => OperationKind::Subscription,
96        }
97    }
98}
99
100pub(crate) type SubgraphSchemas = HashMap<String, SubgraphSchema>;
101
102pub(crate) struct SubgraphSchema {
103    pub(crate) schema: Arc<Valid<apollo_compiler::Schema>>,
104    // TODO: Ideally should have separate nominal type for subgraph's schema hash
105    pub(crate) hash: SchemaHash,
106}
107
108impl SubgraphSchema {
109    pub(crate) fn new(schema: Valid<apollo_compiler::Schema>) -> Self {
110        let sdl = schema.serialize().no_indent().to_string();
111        Self {
112            schema: Arc::new(schema),
113            hash: SchemaHash::new(&sdl),
114        }
115    }
116}
117
118/// A fetch node.
119#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
120#[serde(rename_all = "camelCase")]
121pub(crate) struct FetchNode {
122    /// The name of the service or subgraph that the fetch is querying.
123    pub(crate) service_name: Arc<str>,
124
125    /// The data that is required for the subgraph fetch.
126    #[serde(skip_serializing_if = "Vec::is_empty")]
127    #[serde(default)]
128    pub(crate) requires: Vec<requires_selection::Selection>,
129
130    /// The variables that are used for the subgraph fetch.
131    pub(crate) variable_usages: Vec<Arc<str>>,
132
133    /// The GraphQL subquery that is used for the fetch.
134    pub(crate) operation: SerializableDocument,
135
136    /// The GraphQL subquery operation name.
137    pub(crate) operation_name: Option<Arc<str>>,
138
139    /// The GraphQL operation kind that is used for the fetch.
140    pub(crate) operation_kind: OperationKind,
141
142    /// Optional id used by Deferred nodes
143    pub(crate) id: Option<String>,
144
145    // Optionally describes a number of "rewrites" that query plan executors should apply to the data that is sent as input of this fetch.
146    pub(crate) input_rewrites: Option<Vec<rewrites::DataRewrite>>,
147
148    // Optionally describes a number of "rewrites" to apply to the data that received from a fetch (and before it is applied to the current in-memory results).
149    pub(crate) output_rewrites: Option<Vec<rewrites::DataRewrite>>,
150
151    // Optionally describes a number of "rewrites" to apply to the data that has already been received further up the tree
152    pub(crate) context_rewrites: Option<Vec<rewrites::DataRewrite>>,
153
154    // hash for the query and relevant parts of the schema. if two different schemas provide the exact same types, fields and directives
155    // affecting the query, then they will have the same hash
156    #[serde(default)]
157    pub(crate) schema_aware_hash: Arc<QueryHash>,
158
159    // authorization metadata for the subgraph query
160    #[serde(default)]
161    pub(crate) authorization: Arc<CacheKeyMetadata>,
162}
163
164#[derive(Default)]
165pub(crate) struct Variables {
166    pub(crate) variables: Object,
167    pub(crate) inverted_paths: Vec<Vec<Path>>,
168    pub(crate) contextual_arguments: Option<ContextualArguments>,
169}
170
171impl Variables {
172    #[instrument(skip_all, level = "debug", name = "make_variables")]
173    #[allow(clippy::too_many_arguments)]
174    pub(crate) fn new(
175        requires: &[requires_selection::Selection],
176        variable_usages: &[Arc<str>],
177        data: &Value,
178        current_dir: &Path,
179        request: &Arc<http::Request<Request>>,
180        schema: &Schema,
181        input_rewrites: &Option<Vec<rewrites::DataRewrite>>,
182        context_rewrites: &Option<Vec<rewrites::DataRewrite>>,
183    ) -> Option<Variables> {
184        let body = request.body();
185        let mut subgraph_context = SubgraphContext::new(data, schema, context_rewrites);
186        if !requires.is_empty() {
187            let mut variables = Object::with_capacity(1 + variable_usages.len());
188
189            variables.extend(variable_usages.iter().filter_map(|key| {
190                body.variables
191                    .get_key_value(key.as_ref())
192                    .map(|(variable_key, value)| (variable_key.clone(), value.clone()))
193            }));
194
195            let mut inverted_paths: Vec<Vec<Path>> = Vec::new();
196            let mut values: IndexSet<Value> = IndexSet::default();
197            data.select_values_and_paths(schema, current_dir, |path, value| {
198                // first get contextual values that are required
199                if let Some(context) = subgraph_context.as_mut() {
200                    context.execute_on_path(path);
201                }
202
203                let mut value = execute_selection_set(value, requires, schema, None);
204                if value.as_object().map(|o| !o.is_empty()).unwrap_or(false) {
205                    rewrites::apply_rewrites(schema, &mut value, input_rewrites);
206                    match values.get_index_of(&value) {
207                        Some(index) => {
208                            inverted_paths[index].push(path.clone());
209                        }
210                        None => {
211                            inverted_paths.push(vec![path.clone()]);
212                            values.insert(value);
213                            debug_assert!(inverted_paths.len() == values.len());
214                        }
215                    }
216                }
217            });
218
219            if values.is_empty() {
220                return None;
221            }
222
223            let representations = Value::Array(Vec::from_iter(values));
224            let contextual_arguments = match subgraph_context.as_mut() {
225                Some(context) => context.add_variables_and_get_args(&mut variables),
226                None => None,
227            };
228
229            variables.insert("representations", representations);
230            Some(Variables {
231                variables,
232                inverted_paths,
233                contextual_arguments,
234            })
235        } else {
236            // with nested operations (Query or Mutation has an operation returning a Query or Mutation),
237            // when the first fetch fails, the query plan will still execute up until the second fetch,
238            // where `requires` is empty (not a federated fetch), the current dir is not emmpty (child of
239            // the previous operation field) and the data is null. In that case, we recognize that we
240            // should not perform the next fetch
241            if !current_dir.is_empty()
242                && data
243                    .get_path(schema, current_dir)
244                    .map(|value| value.is_null())
245                    .unwrap_or(true)
246            {
247                return None;
248            }
249
250            Some(Variables {
251                variables: variable_usages
252                    .iter()
253                    .filter_map(|key| {
254                        body.variables
255                            .get_key_value(key.as_ref())
256                            .map(|(variable_key, value)| (variable_key.clone(), value.clone()))
257                    })
258                    .collect::<Object>(),
259                inverted_paths: Vec::new(),
260                contextual_arguments: None,
261            })
262        }
263    }
264}
265
266impl FetchNode {
267    #[allow(clippy::too_many_arguments)]
268    pub(crate) async fn subgraph_fetch(
269        &self,
270        service: BoxService,
271        subgraph_request: SubgraphRequest,
272        current_dir: &Path,
273        schema: &Schema,
274        paths: Vec<Vec<Path>>,
275        operation_str: &str,
276        variables: Map<ByteString, Value>,
277    ) -> (Value, Vec<Error>) {
278        let (_parts, response) = match service
279            .oneshot(subgraph_request)
280            .instrument(tracing::trace_span!("subfetch_stream"))
281            .await
282            .map_to_graphql_error(self.service_name.to_string(), current_dir)
283        {
284            Err(e) => {
285                return (Value::default(), vec![e]);
286            }
287            Ok(res) => res.response.into_parts(),
288        };
289
290        super::log::trace_subfetch(&self.service_name, operation_str, &variables, &response);
291
292        if !response.is_primary() {
293            return (
294                Value::default(),
295                vec![
296                    FetchError::SubrequestUnexpectedPatchResponse {
297                        service: self.service_name.to_string(),
298                    }
299                    .to_graphql_error(Some(current_dir.to_owned())),
300                ],
301            );
302        }
303
304        let (value, errors) = self.response_at_path(schema, current_dir, paths, response);
305
306        (value, errors)
307    }
308
309    pub(crate) fn deferred_fetches(
310        current_dir: &Path,
311        id: &Option<String>,
312        deferred_fetches: &std::collections::HashMap<String, Sender<(Value, Vec<Error>)>>,
313        value: &Value,
314        errors: &[Error],
315    ) {
316        if let Some(id) = id
317            && let Some(sender) = deferred_fetches.get(id.as_str())
318        {
319            u64_counter!(
320                "apollo.router.operations.defer.fetch",
321                "Number of deferred responses fetched from subgraphs",
322                1
323            );
324            if let Err(e) = sender.clone().send((value.clone(), Vec::from(errors))) {
325                tracing::error!(
326                    "error sending fetch result at path {} and id {:?} for deferred response building: {}",
327                    current_dir,
328                    id,
329                    e
330                );
331            }
332        }
333    }
334
335    #[instrument(skip_all, level = "debug", name = "response_insert")]
336    pub(crate) fn response_at_path<'a>(
337        &'a self,
338        schema: &Schema,
339        current_dir: &'a Path,
340        inverted_paths: Vec<Vec<Path>>,
341        response: graphql::Response,
342    ) -> (Value, Vec<Error>) {
343        if !self.requires.is_empty() {
344            let entities_path = Path(vec![json_ext::PathElement::Key(
345                "_entities".to_string(),
346                None,
347            )]);
348
349            let mut errors: Vec<Error> = vec![];
350            for mut error in response.errors {
351                // the locations correspond to the subgraph query and cannot be linked to locations
352                // in the client query, so we remove them
353                error.locations = Vec::new();
354
355                // errors with path should be updated to the path of the entity they target
356                if let Some(ref path) = error.path {
357                    if path.starts_with(&entities_path) {
358                        // the error's path has the format '/_entities/1/other' so we ignore the
359                        // first element and then get the index
360                        match path.0.get(1) {
361                            Some(json_ext::PathElement::Index(i)) => {
362                                for values_path in
363                                    inverted_paths.get(*i).iter().flat_map(|v| v.iter())
364                                {
365                                    errors.push(
366                                        Error::builder()
367                                            .locations(error.locations.clone())
368                                            // append to the entity's path the error's path without
369                                            //`_entities` and the index
370                                            .path(Path::from_iter(
371                                                values_path.0.iter().chain(&path.0[2..]).cloned(),
372                                            ))
373                                            .message(error.message.clone())
374                                            .and_extension_code(error.extension_code())
375                                            .extensions(error.extensions.clone())
376                                            // re-use the original ID so we don't double count this error
377                                            .apollo_id(error.apollo_id())
378                                            .build(),
379                                    )
380                                }
381                            }
382                            _ => {
383                                error.path = Some(current_dir.clone());
384                                errors.push(error)
385                            }
386                        }
387                    } else {
388                        error.path = Some(current_dir.clone());
389                        errors.push(error);
390                    }
391                } else {
392                    error.path = Some(current_dir.clone());
393                    errors.push(error);
394                }
395            }
396
397            // we have to nest conditions and do early returns here
398            // because we need to take ownership of the inner value
399            if let Some(Value::Object(mut map)) = response.data
400                && let Some(entities) = map.remove("_entities")
401            {
402                tracing::trace!("received entities: {:?}", &entities);
403
404                if let Value::Array(array) = entities {
405                    let mut value = Value::default();
406
407                    for (index, mut entity) in array.into_iter().enumerate() {
408                        rewrites::apply_rewrites(schema, &mut entity, &self.output_rewrites);
409
410                        if let Some(paths) = inverted_paths.get(index) {
411                            if paths.len() > 1 {
412                                for path in &paths[1..] {
413                                    let _ = value.insert(path, entity.clone());
414                                }
415                            }
416
417                            if let Some(path) = paths.first() {
418                                let _ = value.insert(path, entity);
419                            }
420                        }
421                    }
422                    return (value, errors);
423                }
424            }
425
426            // if we get here, it means that the response was missing the `_entities` key
427            // This can happen if the subgraph failed during query execution e.g. for permissions checks.
428            // In this case we should add an additional error because the subgraph should have returned an error that will be bubbled up to the client.
429            // However, if they have not then print a warning to the logs.
430            if errors.is_empty() {
431                tracing::warn!(
432                    "Subgraph response from '{}' was missing key `_entities` and had no errors. This is likely a bug in the subgraph.",
433                    self.service_name
434                );
435            }
436
437            (Value::Null, errors)
438        } else {
439            let current_slice =
440                if matches!(current_dir.last(), Some(&json_ext::PathElement::Flatten(_))) {
441                    &current_dir.0[..current_dir.0.len() - 1]
442                } else {
443                    &current_dir.0[..]
444                };
445
446            let errors: Vec<Error> = response
447                .errors
448                .into_iter()
449                .map(|error| {
450                    let path = error
451                        .path
452                        .as_ref()
453                        .map(|path| {
454                            Path::from_iter(current_slice.iter().chain(path.iter()).cloned())
455                        })
456                        .unwrap_or_else(|| current_dir.clone());
457
458                    Error::builder()
459                        .locations(error.locations.clone())
460                        .path(path)
461                        .message(error.message.clone())
462                        .and_extension_code(error.extension_code())
463                        .extensions(error.extensions.clone())
464                        .apollo_id(error.apollo_id())
465                        .build()
466                })
467                .collect();
468            let mut data = response.data.unwrap_or_default();
469            rewrites::apply_rewrites(schema, &mut data, &self.output_rewrites);
470            (Value::from_path(current_dir, data), errors)
471        }
472    }
473
474    #[cfg(test)]
475    pub(crate) fn service_name(&self) -> &str {
476        &self.service_name
477    }
478
479    pub(crate) fn operation_kind(&self) -> &OperationKind {
480        &self.operation_kind
481    }
482
483    pub(crate) fn init_parsed_operation(
484        &mut self,
485        subgraph_schemas: &SubgraphSchemas,
486    ) -> Result<(), ValidationErrors> {
487        let schema = &subgraph_schemas[self.service_name.as_ref()];
488        self.operation.init_parsed(&schema.schema)?;
489        Ok(())
490    }
491
492    pub(crate) fn init_parsed_operation_and_hash_subquery(
493        &mut self,
494        subgraph_schemas: &SubgraphSchemas,
495    ) -> Result<(), ValidationErrors> {
496        let schema = &subgraph_schemas[self.service_name.as_ref()];
497        self.operation.init_parsed(&schema.schema)?;
498        self.schema_aware_hash = Arc::new(schema.hash.operation_hash(
499            self.operation.as_serialized(),
500            self.operation_name.as_deref(),
501        ));
502        Ok(())
503    }
504
505    pub(crate) fn extract_authorization_metadata(
506        &mut self,
507        schema: &Valid<apollo_compiler::Schema>,
508        global_authorisation_cache_key: &CacheKeyMetadata,
509    ) {
510        let doc = ExecutableDocument::parse(
511            schema,
512            self.operation.as_serialized().to_string(),
513            "query.graphql",
514        )
515        // Assume query planing creates a valid document: ignore parse errors
516        .unwrap_or_else(|invalid| invalid.partial);
517        let subgraph_query_cache_key = AuthorizationPlugin::generate_cache_metadata(
518            &doc,
519            self.operation_name.as_deref(),
520            schema,
521            !self.requires.is_empty(),
522        );
523
524        // we need to intersect the cache keys because the global key already takes into account
525        // the scopes and policies from the client request
526        self.authorization = Arc::new(AuthorizationPlugin::intersect_cache_keys_subgraph(
527            global_authorisation_cache_key,
528            &subgraph_query_cache_key,
529        ));
530    }
531}