Skip to main content

apollo_router/query_planner/
fetch.rs

1use std::fmt::Display;
2use std::sync::Arc;
3
4use apollo_compiler::ExecutableDocument;
5use apollo_compiler::ast;
6use apollo_compiler::collections::HashMap;
7use apollo_compiler::validation::Valid;
8use apollo_federation::query_plan::requires_selection;
9use apollo_federation::query_plan::serializable_document::SerializableDocument;
10use indexmap::IndexSet;
11use serde::Deserialize;
12use serde::Serialize;
13use serde_json_bytes::ByteString;
14use serde_json_bytes::Map;
15use tokio::sync::broadcast::Sender;
16use tower::ServiceExt;
17use tracing::Instrument;
18use tracing::instrument;
19
20use super::rewrites;
21use super::selection::execute_selection_set;
22use super::subgraph_context::ContextualArguments;
23use super::subgraph_context::SubgraphContext;
24use crate::error::Error;
25use crate::error::FetchError;
26use crate::error::ValidationErrors;
27use crate::graphql;
28use crate::graphql::Request;
29use crate::json_ext;
30use crate::json_ext::Object;
31use crate::json_ext::Path;
32use crate::json_ext::Value;
33use crate::json_ext::ValueExt;
34use crate::plugins::authorization::AuthorizationPlugin;
35use crate::plugins::authorization::CacheKeyMetadata;
36use crate::services::SubgraphRequest;
37use crate::services::fetch::ErrorMapping;
38use crate::services::subgraph::BoxService;
39use crate::spec::QueryHash;
40use crate::spec::Schema;
41use crate::spec::SchemaHash;
42
43/// GraphQL operation type.
44#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Hash, Deserialize, Serialize)]
45#[serde(rename_all = "camelCase")]
46#[non_exhaustive]
47#[cfg_attr(test, derive(schemars::JsonSchema))]
48pub enum OperationKind {
49    #[default]
50    Query,
51    Mutation,
52    Subscription,
53}
54
55impl Display for OperationKind {
56    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57        write!(f, "{}", self.default_type_name())
58    }
59}
60
61impl OperationKind {
62    pub(crate) const fn default_type_name(&self) -> &'static str {
63        match self {
64            OperationKind::Query => "Query",
65            OperationKind::Mutation => "Mutation",
66            OperationKind::Subscription => "Subscription",
67        }
68    }
69
70    /// Only for apollo studio exporter
71    pub(crate) const fn as_apollo_operation_type(&self) -> &'static str {
72        match self {
73            OperationKind::Query => "query",
74            OperationKind::Mutation => "mutation",
75            OperationKind::Subscription => "subscription",
76        }
77    }
78}
79
80impl From<OperationKind> for ast::OperationType {
81    fn from(value: OperationKind) -> Self {
82        match value {
83            OperationKind::Query => ast::OperationType::Query,
84            OperationKind::Mutation => ast::OperationType::Mutation,
85            OperationKind::Subscription => ast::OperationType::Subscription,
86        }
87    }
88}
89
90impl From<ast::OperationType> for OperationKind {
91    fn from(value: ast::OperationType) -> Self {
92        match value {
93            ast::OperationType::Query => OperationKind::Query,
94            ast::OperationType::Mutation => OperationKind::Mutation,
95            ast::OperationType::Subscription => OperationKind::Subscription,
96        }
97    }
98}
99
100pub(crate) type SubgraphSchemas = HashMap<String, SubgraphSchema>;
101
102pub(crate) struct SubgraphSchema {
103    pub(crate) schema: Arc<Valid<apollo_compiler::Schema>>,
104    // TODO: Ideally should have separate nominal type for subgraph's schema hash
105    pub(crate) hash: SchemaHash,
106}
107
108impl SubgraphSchema {
109    pub(crate) fn new(schema: Valid<apollo_compiler::Schema>) -> Self {
110        let sdl = schema.serialize().no_indent().to_string();
111        Self {
112            schema: Arc::new(schema),
113            hash: SchemaHash::new(&sdl),
114        }
115    }
116}
117
118/// A fetch node.
119#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
120#[serde(rename_all = "camelCase")]
121pub(crate) struct FetchNode {
122    /// The name of the service or subgraph that the fetch is querying.
123    pub(crate) service_name: Arc<str>,
124
125    /// The data that is required for the subgraph fetch.
126    #[serde(skip_serializing_if = "Vec::is_empty")]
127    #[serde(default)]
128    pub(crate) requires: Vec<requires_selection::Selection>,
129
130    /// The variables that are used for the subgraph fetch.
131    pub(crate) variable_usages: Vec<Arc<str>>,
132
133    /// The GraphQL subquery that is used for the fetch.
134    pub(crate) operation: SerializableDocument,
135
136    /// The GraphQL subquery operation name.
137    pub(crate) operation_name: Option<Arc<str>>,
138
139    /// The GraphQL operation kind that is used for the fetch.
140    pub(crate) operation_kind: OperationKind,
141
142    /// Optional id used by Deferred nodes
143    pub(crate) id: Option<String>,
144
145    // Optionally describes a number of "rewrites" that query plan executors should apply to the data that is sent as input of this fetch.
146    pub(crate) input_rewrites: Option<Vec<rewrites::DataRewrite>>,
147
148    // Optionally describes a number of "rewrites" to apply to the data that received from a fetch (and before it is applied to the current in-memory results).
149    pub(crate) output_rewrites: Option<Vec<rewrites::DataRewrite>>,
150
151    // Optionally describes a number of "rewrites" to apply to the data that has already been received further up the tree
152    pub(crate) context_rewrites: Option<Vec<rewrites::DataRewrite>>,
153
154    // hash for the query and relevant parts of the schema. if two different schemas provide the exact same types, fields and directives
155    // affecting the query, then they will have the same hash
156    #[serde(default)]
157    pub(crate) schema_aware_hash: Arc<QueryHash>,
158
159    // authorization metadata for the subgraph query
160    #[serde(default)]
161    pub(crate) authorization: Arc<CacheKeyMetadata>,
162}
163
164#[derive(Default)]
165pub(crate) struct Variables {
166    pub(crate) variables: Object,
167    pub(crate) inverted_paths: Vec<Vec<Path>>,
168    pub(crate) contextual_arguments: Option<ContextualArguments>,
169}
170
171impl Variables {
172    #[instrument(skip_all, level = "debug", name = "make_variables")]
173    #[allow(clippy::too_many_arguments)]
174    pub(crate) fn new(
175        requires: &[requires_selection::Selection],
176        variable_usages: &[Arc<str>],
177        data: &Value,
178        current_dir: &Path,
179        request: &Arc<http::Request<Request>>,
180        schema: &Schema,
181        input_rewrites: &Option<Vec<rewrites::DataRewrite>>,
182        context_rewrites: &Option<Vec<rewrites::DataRewrite>>,
183    ) -> Option<Variables> {
184        let body = request.body();
185        let mut subgraph_context = SubgraphContext::new(data, schema, context_rewrites);
186        if !requires.is_empty() {
187            let mut variables = Object::with_capacity(1 + variable_usages.len());
188
189            variables.extend(variable_usages.iter().filter_map(|key| {
190                body.variables
191                    .get_key_value(key.as_ref())
192                    .map(|(variable_key, value)| (variable_key.clone(), value.clone()))
193            }));
194
195            let mut inverted_paths: Vec<Vec<Path>> = Vec::new();
196            let mut values: IndexSet<Value> = IndexSet::default();
197            data.select_values_and_paths(schema, current_dir, |path, value| {
198                // first get contextual values that are required
199                if let Some(context) = subgraph_context.as_mut() {
200                    context.execute_on_path(path);
201                }
202
203                let mut value = execute_selection_set(value, requires, schema, None);
204                if value.as_object().map(|o| !o.is_empty()).unwrap_or(false) {
205                    rewrites::apply_rewrites(schema, &mut value, input_rewrites);
206                    match values.get_index_of(&value) {
207                        Some(index) => {
208                            inverted_paths[index].push(path.clone());
209                        }
210                        None => {
211                            inverted_paths.push(vec![path.clone()]);
212                            values.insert(value);
213                            debug_assert!(inverted_paths.len() == values.len());
214                        }
215                    }
216                }
217            });
218
219            if values.is_empty() {
220                return None;
221            }
222
223            let representations = Value::Array(Vec::from_iter(values));
224            let contextual_arguments = match subgraph_context.as_mut() {
225                Some(context) => context.add_variables_and_get_args(&mut variables),
226                None => None,
227            };
228
229            variables.insert("representations", representations);
230            Some(Variables {
231                variables,
232                inverted_paths,
233                contextual_arguments,
234            })
235        } else {
236            // with nested operations (Query or Mutation has an operation returning a Query or Mutation),
237            // when the first fetch fails, the query plan will still execute up until the second fetch,
238            // where `requires` is empty (not a federated fetch), the current dir is not emmpty (child of
239            // the previous operation field) and the data is null. In that case, we recognize that we
240            // should not perform the next fetch
241            if !current_dir.is_empty()
242                && data
243                    .get_path(schema, current_dir)
244                    .map(|value| value.is_null())
245                    .unwrap_or(true)
246            {
247                return None;
248            }
249
250            Some(Variables {
251                variables: variable_usages
252                    .iter()
253                    .filter_map(|key| {
254                        body.variables
255                            .get_key_value(key.as_ref())
256                            .map(|(variable_key, value)| (variable_key.clone(), value.clone()))
257                    })
258                    .collect::<Object>(),
259                inverted_paths: Vec::new(),
260                contextual_arguments: None,
261            })
262        }
263    }
264}
265
266impl FetchNode {
267    #[allow(clippy::too_many_arguments)]
268    pub(crate) async fn subgraph_fetch(
269        &self,
270        service: BoxService,
271        subgraph_request: SubgraphRequest,
272        current_dir: &Path,
273        schema: &Schema,
274        paths: Vec<Vec<Path>>,
275        operation_str: &str,
276        variables: Map<ByteString, Value>,
277        hoist_orphan_errors: bool,
278    ) -> (Value, Vec<Error>) {
279        let (_parts, response) = match service
280            .oneshot(subgraph_request)
281            .instrument(tracing::trace_span!("subfetch_stream"))
282            .await
283            .map_to_graphql_error(self.service_name.to_string(), current_dir)
284        {
285            Err(e) => {
286                return (Value::default(), vec![e]);
287            }
288            Ok(res) => res.response.into_parts(),
289        };
290
291        super::log::trace_subfetch(&self.service_name, operation_str, &variables, &response);
292
293        if !response.is_primary() {
294            return (
295                Value::default(),
296                vec![
297                    FetchError::SubrequestUnexpectedPatchResponse {
298                        service: self.service_name.to_string(),
299                    }
300                    .to_graphql_error(Some(current_dir.to_owned())),
301                ],
302            );
303        }
304
305        let (value, errors) =
306            self.response_at_path(schema, current_dir, paths, response, hoist_orphan_errors);
307
308        (value, errors)
309    }
310
311    pub(crate) fn deferred_fetches(
312        current_dir: &Path,
313        id: &Option<String>,
314        deferred_fetches: &std::collections::HashMap<String, Sender<(Value, Vec<Error>)>>,
315        value: &Value,
316        errors: &[Error],
317    ) {
318        if let Some(id) = id
319            && let Some(sender) = deferred_fetches.get(id.as_str())
320        {
321            u64_counter!(
322                "apollo.router.operations.defer.fetch",
323                "Number of deferred responses fetched from subgraphs",
324                1
325            );
326            if let Err(e) = sender.clone().send((value.clone(), Vec::from(errors))) {
327                tracing::error!(
328                    "error sending fetch result at path {} and id {:?} for deferred response building: {}",
329                    current_dir,
330                    id,
331                    e
332                );
333            }
334        }
335    }
336
337    /// Maps a subgraph's response into what can be merged in the overall supergraph response. It
338    /// does this by making sure both the data and errors from a subgraph's response can be plugged
339    /// into the right slots for the supergraph response, and it does that by a bit of path
340    /// handling and manipulation
341    ///
342    /// When `hoist_orphan_errors` is true, entity-less errors are assigned to the nearest
343    /// non-array ancestor of `current_dir`, preventing error multiplication across array
344    /// elements. When false, they are assigned to `current_dir` as-is.
345    #[instrument(skip_all, level = "debug", name = "response_insert")]
346    pub(crate) fn response_at_path<'a>(
347        &'a self,
348        schema: &Schema,
349        current_dir: &'a Path,
350        inverted_paths: Vec<Vec<Path>>,
351        response: graphql::Response,
352        hoist_orphan_errors: bool,
353    ) -> (Value, Vec<Error>) {
354        if !self.requires.is_empty() {
355            let entities_path = Path(vec![json_ext::PathElement::Key(
356                "_entities".to_string(),
357                None,
358            )]);
359
360            // when hoist_orphan_errors is enabled, the fallback_dir is the immediate parent of
361            // the current_dir when the current_dir is wildcarded (ie, @, which is PathElement::Flatten)
362            //
363            // this prevents error multiplication across array elements
364            let error_dir = if hoist_orphan_errors {
365                let pos = current_dir
366                    .0
367                    .iter()
368                    .position(|e| matches!(e, json_ext::PathElement::Flatten(_)));
369                match pos {
370                    Some(i) => Path(current_dir.0[..i].to_vec()),
371                    None => current_dir.clone(),
372                }
373            } else {
374                current_dir.clone()
375            };
376
377            let mut errors: Vec<Error> = vec![];
378            for mut error in response.errors {
379                // the locations correspond to the subgraph query and cannot be linked to locations
380                // in the client query, so we remove them
381                error.locations = Vec::new();
382
383                // errors with path should be updated to the path of the entity they target
384                if let Some(ref path) = error.path {
385                    if path.starts_with(&entities_path) {
386                        // the error's path has the format '/_entities/1/other' so we ignore the
387                        // first element and then get the index
388                        match path.0.get(1) {
389                            Some(json_ext::PathElement::Index(i)) => {
390                                for values_path in
391                                    inverted_paths.get(*i).iter().flat_map(|v| v.iter())
392                                {
393                                    errors.push(
394                                        Error::builder()
395                                            .locations(error.locations.clone())
396                                            // append to the entity's path the error's path without
397                                            //`_entities` and the index
398                                            .path(Path::from_iter(
399                                                values_path.0.iter().chain(&path.0[2..]).cloned(),
400                                            ))
401                                            .message(error.message.clone())
402                                            .and_extension_code(error.extension_code())
403                                            .extensions(error.extensions.clone())
404                                            // re-use the original ID so we don't double count this error
405                                            .apollo_id(error.apollo_id())
406                                            .build(),
407                                    )
408                                }
409                            }
410                            _ => {
411                                error.path = Some(error_dir.clone());
412                                errors.push(error)
413                            }
414                        }
415                    } else {
416                        error.path = Some(error_dir.clone());
417                        errors.push(error);
418                    }
419                } else {
420                    error.path = Some(error_dir.clone());
421                    errors.push(error);
422                }
423            }
424
425            // we have to nest conditions and do early returns here
426            // because we need to take ownership of the inner value
427            if let Some(Value::Object(mut map)) = response.data
428                && let Some(entities) = map.remove("_entities")
429            {
430                tracing::trace!("received entities: {:?}", &entities);
431
432                if let Value::Array(array) = entities {
433                    let mut value = Value::default();
434
435                    for (index, mut entity) in array.into_iter().enumerate() {
436                        rewrites::apply_rewrites(schema, &mut entity, &self.output_rewrites);
437
438                        if let Some(paths) = inverted_paths.get(index) {
439                            if paths.len() > 1 {
440                                for path in &paths[1..] {
441                                    let _ = value.insert(path, entity.clone());
442                                }
443                            }
444
445                            if let Some(path) = paths.first() {
446                                let _ = value.insert(path, entity);
447                            }
448                        }
449                    }
450                    return (value, errors);
451                }
452            }
453
454            // if we get here, it means that the response was missing the `_entities` key
455            // This can happen if the subgraph failed during query execution e.g. for permissions checks.
456            // In this case we should add an additional error because the subgraph should have returned an error that will be bubbled up to the client.
457            // However, if they have not then print a warning to the logs.
458            if errors.is_empty() {
459                tracing::warn!(
460                    "Subgraph response from '{}' was missing key `_entities` and had no errors. This is likely a bug in the subgraph.",
461                    self.service_name
462                );
463            }
464
465            (Value::Null, errors)
466        } else {
467            let current_slice =
468                if matches!(current_dir.last(), Some(&json_ext::PathElement::Flatten(_))) {
469                    &current_dir.0[..current_dir.0.len() - 1]
470                } else {
471                    &current_dir.0[..]
472                };
473
474            let errors: Vec<Error> = response
475                .errors
476                .into_iter()
477                .map(|error| {
478                    let path = error
479                        .path
480                        .as_ref()
481                        .map(|path| {
482                            Path::from_iter(current_slice.iter().chain(path.iter()).cloned())
483                        })
484                        .unwrap_or_else(|| current_dir.clone());
485
486                    Error::builder()
487                        .locations(error.locations.clone())
488                        .path(path)
489                        .message(error.message.clone())
490                        .and_extension_code(error.extension_code())
491                        .extensions(error.extensions.clone())
492                        .apollo_id(error.apollo_id())
493                        .build()
494                })
495                .collect();
496            let mut data = response.data.unwrap_or_default();
497            rewrites::apply_rewrites(schema, &mut data, &self.output_rewrites);
498            (Value::from_path(current_dir, data), errors)
499        }
500    }
501
502    #[cfg(test)]
503    pub(crate) fn service_name(&self) -> &str {
504        &self.service_name
505    }
506
507    pub(crate) fn operation_kind(&self) -> &OperationKind {
508        &self.operation_kind
509    }
510
511    pub(crate) fn init_parsed_operation(
512        &mut self,
513        subgraph_schemas: &SubgraphSchemas,
514    ) -> Result<(), ValidationErrors> {
515        let schema = &subgraph_schemas[self.service_name.as_ref()];
516        self.operation.init_parsed(&schema.schema)?;
517        Ok(())
518    }
519
520    pub(crate) fn init_parsed_operation_and_hash_subquery(
521        &mut self,
522        subgraph_schemas: &SubgraphSchemas,
523    ) -> Result<(), ValidationErrors> {
524        let schema = &subgraph_schemas[self.service_name.as_ref()];
525        self.operation.init_parsed(&schema.schema)?;
526        self.schema_aware_hash = Arc::new(schema.hash.operation_hash(
527            self.operation.as_serialized(),
528            self.operation_name.as_deref(),
529        ));
530        Ok(())
531    }
532
533    pub(crate) fn extract_authorization_metadata(
534        &mut self,
535        schema: &Valid<apollo_compiler::Schema>,
536        global_authorisation_cache_key: &CacheKeyMetadata,
537    ) {
538        let doc = ExecutableDocument::parse(
539            schema,
540            self.operation.as_serialized().to_string(),
541            "query.graphql",
542        )
543        // Assume query planing creates a valid document: ignore parse errors
544        .unwrap_or_else(|invalid| invalid.partial);
545        let subgraph_query_cache_key = AuthorizationPlugin::generate_cache_metadata(
546            &doc,
547            self.operation_name.as_deref(),
548            schema,
549            !self.requires.is_empty(),
550        );
551
552        // we need to intersect the cache keys because the global key already takes into account
553        // the scopes and policies from the client request
554        self.authorization = Arc::new(AuthorizationPlugin::intersect_cache_keys_subgraph(
555            global_authorisation_cache_key,
556            &subgraph_query_cache_key,
557        ));
558    }
559}
560
561#[cfg(test)]
562mod tests {
563    use apollo_compiler::name;
564    use apollo_federation::query_plan::requires_selection;
565    use apollo_federation::query_plan::serializable_document::SerializableDocument;
566    use rstest::rstest;
567    use serde_json_bytes::json;
568
569    use super::*;
570    use crate::Configuration;
571
572    fn test_schema() -> Schema {
573        let sdl = r#"
574            schema
575                @link(url: "https://specs.apollo.dev/link/v1.0")
576                @link(url: "https://specs.apollo.dev/join/v0.3", for: EXECUTION)
577            {
578                query: Query
579            }
580            directive @link(url: String, as: String, for: link__Purpose, import: [link__Import]) repeatable on SCHEMA
581            directive @join__type(graph: join__Graph!, key: join__FieldSet, extension: Boolean! = false, resolvable: Boolean! = true, isInterfaceObject: Boolean! = false) repeatable on OBJECT | INTERFACE | UNION | ENUM | INPUT_OBJECT | SCALAR
582            directive @join__graph(name: String!, url: String!) on ENUM_VALUE
583
584            scalar link__Import
585            scalar join__FieldSet
586
587            enum link__Purpose { SECURITY EXECUTION }
588
589            enum join__Graph {
590                TEST @join__graph(name: "test", url: "http://localhost:4001/graphql")
591            }
592
593            type Query {
594                me: String
595            }
596        "#;
597        Schema::parse(sdl, &Configuration::default()).unwrap()
598    }
599
600    fn make_fetch_node(requires: Vec<requires_selection::Selection>) -> FetchNode {
601        FetchNode {
602            service_name: "test".into(),
603            requires,
604            variable_usages: vec![],
605            operation: SerializableDocument::from_string("{ me }"),
606            operation_name: None,
607            operation_kind: OperationKind::Query,
608            id: None,
609            input_rewrites: None,
610            output_rewrites: None,
611            context_rewrites: None,
612            schema_aware_hash: Default::default(),
613            authorization: Default::default(),
614        }
615    }
616
617    fn make_requires() -> Vec<requires_selection::Selection> {
618        vec![requires_selection::Selection::InlineFragment(
619            requires_selection::InlineFragment {
620                type_condition: Some(name!("T")),
621                selections: vec![requires_selection::Selection::Field(
622                    requires_selection::Field {
623                        alias: None,
624                        name: name!("id"),
625                        selections: Vec::new(),
626                    },
627                )],
628            },
629        )]
630    }
631
632    fn key(name: &str) -> json_ext::PathElement {
633        json_ext::PathElement::Key(name.to_string(), None)
634    }
635
636    fn index(i: usize) -> json_ext::PathElement {
637        json_ext::PathElement::Index(i)
638    }
639
640    fn flatten() -> json_ext::PathElement {
641        json_ext::PathElement::Flatten(None)
642    }
643
644    fn make_error(path: Option<Path>) -> graphql::Error {
645        match path {
646            Some(p) => graphql::Error::builder().message("err").path(p).build(),
647            None => graphql::Error::builder().message("err").build(),
648        }
649    }
650
651    #[rstest]
652    #[case::single_key(
653        vec![key("topLevel")],
654        Some(json!({"field": "value"})),
655        json!({"topLevel": {"field": "value"}})
656    )]
657    #[case::no_data(
658        vec![key("topLevel")],
659        None,
660        json!({"topLevel": null})
661    )]
662    #[case::empty_current_dir(
663        vec![],
664        Some(json!({"me": "hello"})),
665        json!({"me": "hello"})
666    )]
667    #[case::deep_nesting(
668        vec![key("a"), key("b"), key("c"), key("d")],
669        Some(json!({"value": 42})),
670        json!({"a": {"b": {"c": {"d": {"value": 42}}}}})
671    )]
672    fn root_fetch_data_wrapping(
673        #[case] dir_elements: Vec<json_ext::PathElement>,
674        #[case] data: Option<Value>,
675        #[case] expected: Value,
676    ) {
677        let schema = test_schema();
678        let node = make_fetch_node(vec![]);
679        let current_dir = Path(dir_elements);
680        let response = graphql::Response {
681            data,
682            ..Default::default()
683        };
684        let (value, errors) = node.response_at_path(&schema, &current_dir, vec![], response, false);
685
686        assert!(errors.is_empty());
687        assert_eq!(value, expected);
688    }
689
690    #[rstest]
691    #[case::prepends_current_dir(
692        vec![key("top"), key("nested")],
693        Some(Path(vec![key("field")])),
694        Path(vec![key("top"), key("nested"), key("field")])
695    )]
696    #[case::no_error_path_uses_current_dir(
697        vec![key("top")],
698        None,
699        Path(vec![key("top")])
700    )]
701    #[case::trailing_flatten_stripped(
702        vec![key("list"), flatten()],
703        Some(Path(vec![key("name")])),
704        Path(vec![key("list"), key("name")])
705    )]
706    #[case::no_error_path_keeps_flatten(
707        vec![key("list"), flatten()],
708        None,
709        Path(vec![key("list"), flatten()])
710    )]
711    #[case::index_in_error_path(
712        vec![key("items")],
713        Some(Path(vec![index(2), key("name")])),
714        Path(vec![key("items"), index(2), key("name")])
715    )]
716    #[case::flatten_mid_path_not_stripped(
717        vec![key("a"), flatten(), key("b")],
718        Some(Path(vec![key("c")])),
719        Path(vec![key("a"), flatten(), key("b"), key("c")])
720    )]
721    fn root_fetch_error_path(
722        #[case] dir_elements: Vec<json_ext::PathElement>,
723        #[case] error_path: Option<Path>,
724        #[case] expected_path: Path,
725    ) {
726        let schema = test_schema();
727        let node = make_fetch_node(vec![]);
728        let current_dir = Path(dir_elements);
729        let response = graphql::Response::builder()
730            .error(make_error(error_path))
731            .build();
732
733        let (_, errors) = node.response_at_path(&schema, &current_dir, vec![], response, false);
734
735        assert_eq!(errors.len(), 1);
736        assert_eq!(errors[0].path.as_ref().unwrap(), &expected_path);
737    }
738
739    #[test]
740    fn root_fetch_multiple_errors() {
741        let schema = test_schema();
742        let node = make_fetch_node(vec![]);
743        let current_dir = Path(vec![key("root")]);
744        let response = graphql::Response::builder()
745            .error(
746                graphql::Error::builder()
747                    .message("error 1")
748                    .path(Path(vec![key("a")]))
749                    .build(),
750            )
751            .error(
752                graphql::Error::builder()
753                    .message("error 2")
754                    .path(Path(vec![key("b")]))
755                    .build(),
756            )
757            .error(graphql::Error::builder().message("error 3").build())
758            .build();
759
760        let (_, errors) = node.response_at_path(&schema, &current_dir, vec![], response, false);
761
762        assert_eq!(errors.len(), 3);
763        assert_eq!(
764            errors[0].path.as_ref().unwrap(),
765            &Path(vec![key("root"), key("a")])
766        );
767        assert_eq!(
768            errors[1].path.as_ref().unwrap(),
769            &Path(vec![key("root"), key("b")])
770        );
771        assert_eq!(errors[2].path.as_ref().unwrap(), &Path(vec![key("root")]));
772    }
773
774    #[test]
775    fn root_fetch_preserves_error_extension_code() {
776        let schema = test_schema();
777        let node = make_fetch_node(vec![]);
778        let current_dir = Path(vec![key("root")]);
779        let response = graphql::Response::builder()
780            .error(
781                graphql::Error::builder()
782                    .message("auth error")
783                    .extension_code("UNAUTHORIZED")
784                    .path(Path(vec![key("field")]))
785                    .build(),
786            )
787            .build();
788
789        let (_, errors) = node.response_at_path(&schema, &current_dir, vec![], response, false);
790
791        assert_eq!(errors.len(), 1);
792        assert_eq!(errors[0].extension_code().as_deref(), Some("UNAUTHORIZED"));
793    }
794
795    #[rstest]
796    #[case::entities_path_no_index(
797        vec![key("users"), flatten()],
798        Some(Path(vec![key("_entities")])),
799        Path(vec![key("users")])
800    )]
801    #[case::non_entities_prefix(
802        vec![key("a"), key("b")],
803        Some(Path(vec![key("other"), key("field")])),
804        Path(vec![key("a"), key("b")])
805    )]
806    #[case::no_path_truncates_at_flatten(
807        vec![key("a"), flatten(), key("b")],
808        None,
809        Path(vec![key("a")])
810    )]
811    #[case::no_flatten_equals_current_dir(
812        vec![key("a"), key("b"), key("c")],
813        None,
814        Path(vec![key("a"), key("b"), key("c")])
815    )]
816    #[case::two_flattens_truncates_at_first(
817        vec![key("a"), flatten(), key("b"), flatten()],
818        None,
819        Path(vec![key("a")])
820    )]
821    #[case::entities_key_not_index(
822        vec![key("root")],
823        Some(Path(vec![key("_entities"), key("notAnIndex")])),
824        Path(vec![key("root")])
825    )]
826    fn entity_error_uses_fallback_dir(
827        #[case] dir_elements: Vec<json_ext::PathElement>,
828        #[case] error_path: Option<Path>,
829        #[case] expected_path: Path,
830    ) {
831        let schema = test_schema();
832        let node = make_fetch_node(make_requires());
833        let current_dir = Path(dir_elements);
834        let response = graphql::Response::builder()
835            .data(json!({"_entities": []}))
836            .error(make_error(error_path))
837            .build();
838
839        let (_, errors) = node.response_at_path(&schema, &current_dir, vec![], response, true);
840
841        assert_eq!(errors.len(), 1);
842        assert_eq!(errors[0].path.as_ref().unwrap(), &expected_path);
843    }
844
845    #[rstest]
846    #[case::flatten_preserved_when_disabled(
847        vec![key("users"), flatten()],
848        None,
849        Path(vec![key("users"), flatten()])
850    )]
851    #[case::nested_flatten_preserved_when_disabled(
852        vec![key("a"), flatten(), key("b"), flatten()],
853        None,
854        Path(vec![key("a"), flatten(), key("b"), flatten()])
855    )]
856    #[case::non_entities_path_gets_current_dir_when_disabled(
857        vec![key("items"), flatten()],
858        Some(Path(vec![key("something")])),
859        Path(vec![key("items"), flatten()])
860    )]
861    fn entity_error_uses_current_dir_when_hoist_disabled(
862        #[case] dir_elements: Vec<json_ext::PathElement>,
863        #[case] error_path: Option<Path>,
864        #[case] expected_path: Path,
865    ) {
866        let schema = test_schema();
867        let node = make_fetch_node(make_requires());
868        let current_dir = Path(dir_elements);
869        let response = graphql::Response::builder()
870            .data(json!({"_entities": []}))
871            .error(make_error(error_path))
872            .build();
873
874        let (_, errors) = node.response_at_path(&schema, &current_dir, vec![], response, false);
875
876        assert_eq!(errors.len(), 1);
877        assert_eq!(errors[0].path.as_ref().unwrap(), &expected_path);
878    }
879
880    #[test]
881    fn entity_fetch_basic_entities_inserted_at_inverted_paths() {
882        let schema = test_schema();
883        let node = make_fetch_node(make_requires());
884        let current_dir = Path(vec![key("topField"), flatten()]);
885        let inverted_paths = vec![
886            vec![Path(vec![key("topField"), index(0)])],
887            vec![Path(vec![key("topField"), index(1)])],
888        ];
889        let response = graphql::Response::builder()
890            .data(json!({
891                "_entities": [
892                    {"name": "Alice"},
893                    {"name": "Bob"}
894                ]
895            }))
896            .build();
897
898        let (value, errors) =
899            node.response_at_path(&schema, &current_dir, inverted_paths, response, false);
900
901        assert!(errors.is_empty());
902        let top = value.as_object().unwrap().get("topField").unwrap();
903        let arr = top.as_array().unwrap();
904        assert_eq!(arr[0], json!({"name": "Alice"}));
905        assert_eq!(arr[1], json!({"name": "Bob"}));
906    }
907
908    #[test]
909    fn entity_fetch_entity_at_multiple_inverted_paths() {
910        let schema = test_schema();
911        let node = make_fetch_node(make_requires());
912        let current_dir = Path(vec![key("field"), flatten()]);
913        let inverted_paths = vec![vec![
914            Path(vec![key("field"), index(0)]),
915            Path(vec![key("field"), index(2)]),
916        ]];
917        let response = graphql::Response::builder()
918            .data(json!({
919                "_entities": [{"name": "Alice"}]
920            }))
921            .build();
922
923        let (value, errors) =
924            node.response_at_path(&schema, &current_dir, inverted_paths, response, false);
925
926        assert!(errors.is_empty());
927        let arr = value
928            .as_object()
929            .unwrap()
930            .get("field")
931            .unwrap()
932            .as_array()
933            .unwrap();
934        assert_eq!(arr[0], json!({"name": "Alice"}));
935        assert_eq!(arr[2], json!({"name": "Alice"}));
936    }
937
938    #[test]
939    fn entity_fetch_empty_entities_array_returns_default_value() {
940        let schema = test_schema();
941        let node = make_fetch_node(make_requires());
942        let current_dir = Path(vec![key("field")]);
943        let response = graphql::Response::builder()
944            .data(json!({"_entities": []}))
945            .build();
946
947        let (value, errors) = node.response_at_path(&schema, &current_dir, vec![], response, false);
948
949        assert!(errors.is_empty());
950        assert_eq!(value, Value::default());
951    }
952
953    #[test]
954    fn entity_fetch_more_entities_than_inverted_paths() {
955        let schema = test_schema();
956        let node = make_fetch_node(make_requires());
957        let current_dir = Path(vec![key("f"), flatten()]);
958        let inverted_paths = vec![vec![Path(vec![key("f"), index(0)])]];
959        let response = graphql::Response::builder()
960            .data(json!({
961                "_entities": [
962                    {"name": "Alice"},
963                    {"name": "Bob"},
964                    {"name": "Charlie"}
965                ]
966            }))
967            .build();
968
969        let (value, errors) =
970            node.response_at_path(&schema, &current_dir, inverted_paths, response, false);
971
972        assert!(errors.is_empty());
973        let arr = value
974            .as_object()
975            .unwrap()
976            .get("f")
977            .unwrap()
978            .as_array()
979            .unwrap();
980        assert_eq!(arr[0], json!({"name": "Alice"}));
981    }
982
983    #[test]
984    fn entity_fetch_error_with_entities_path_and_index_remapped() {
985        let schema = test_schema();
986        let node = make_fetch_node(make_requires());
987        let current_dir = Path(vec![key("users"), flatten()]);
988        let inverted_paths = vec![
989            vec![Path(vec![key("users"), index(0)])],
990            vec![Path(vec![key("users"), index(1)])],
991        ];
992        let response = graphql::Response::builder()
993            .data(json!({"_entities": [null, null]}))
994            .error(
995                graphql::Error::builder()
996                    .message("entity error")
997                    .path(Path(vec![key("_entities"), index(1), key("name")]))
998                    .build(),
999            )
1000            .build();
1001
1002        let (_, errors) =
1003            node.response_at_path(&schema, &current_dir, inverted_paths, response, false);
1004
1005        assert_eq!(errors.len(), 1);
1006        assert_eq!(
1007            errors[0].path.as_ref().unwrap(),
1008            &Path(vec![key("users"), index(1), key("name")])
1009        );
1010        assert_eq!(errors[0].message, "entity error");
1011    }
1012
1013    #[test]
1014    fn entity_fetch_error_locations_cleared() {
1015        let schema = test_schema();
1016        let node = make_fetch_node(make_requires());
1017        let current_dir = Path(vec![key("data")]);
1018        let response = graphql::Response::builder()
1019            .data(json!({"_entities": [null]}))
1020            .error(
1021                graphql::Error::builder()
1022                    .message("err")
1023                    .locations(vec![graphql::Location { line: 1, column: 5 }])
1024                    .path(Path(vec![key("_entities"), index(0), key("x")]))
1025                    .build(),
1026            )
1027            .build();
1028
1029        let (_, errors) = node.response_at_path(
1030            &schema,
1031            &current_dir,
1032            vec![vec![Path(vec![key("data"), index(0)])]],
1033            response,
1034            false,
1035        );
1036
1037        assert_eq!(errors.len(), 1);
1038        assert!(errors[0].locations.is_empty());
1039    }
1040
1041    #[test]
1042    fn entity_fetch_error_index_remapped_to_multiple_inverted_paths() {
1043        let schema = test_schema();
1044        let node = make_fetch_node(make_requires());
1045        let current_dir = Path(vec![key("items"), flatten()]);
1046        let inverted_paths = vec![vec![
1047            Path(vec![key("items"), index(0)]),
1048            Path(vec![key("items"), index(3)]),
1049        ]];
1050        let response = graphql::Response::builder()
1051            .data(json!({"_entities": [null]}))
1052            .error(
1053                graphql::Error::builder()
1054                    .message("err")
1055                    .path(Path(vec![key("_entities"), index(0), key("name")]))
1056                    .build(),
1057            )
1058            .build();
1059
1060        let (_, errors) =
1061            node.response_at_path(&schema, &current_dir, inverted_paths, response, false);
1062
1063        assert_eq!(errors.len(), 2);
1064        assert_eq!(
1065            errors[0].path.as_ref().unwrap(),
1066            &Path(vec![key("items"), index(0), key("name")])
1067        );
1068        assert_eq!(
1069            errors[1].path.as_ref().unwrap(),
1070            &Path(vec![key("items"), index(3), key("name")])
1071        );
1072    }
1073
1074    #[test]
1075    fn entity_fetch_error_index_out_of_bounds_inverted_paths_no_panic() {
1076        let schema = test_schema();
1077        let node = make_fetch_node(make_requires());
1078        let current_dir = Path(vec![key("x")]);
1079        let response = graphql::Response::builder()
1080            .data(json!({"_entities": []}))
1081            .error(
1082                graphql::Error::builder()
1083                    .message("oob")
1084                    .path(Path(vec![key("_entities"), index(5), key("f")]))
1085                    .build(),
1086            )
1087            .build();
1088
1089        let (_, errors) = node.response_at_path(&schema, &current_dir, vec![], response, false);
1090
1091        assert!(errors.is_empty());
1092    }
1093
1094    #[test]
1095    fn entity_fetch_preserves_extension_code_on_remapped_errors() {
1096        let schema = test_schema();
1097        let node = make_fetch_node(make_requires());
1098        let current_dir = Path(vec![key("users"), flatten()]);
1099        let inverted_paths = vec![vec![Path(vec![key("users"), index(0)])]];
1100        let response = graphql::Response::builder()
1101            .data(json!({"_entities": [null]}))
1102            .error(
1103                graphql::Error::builder()
1104                    .message("forbidden")
1105                    .extension_code("FORBIDDEN")
1106                    .path(Path(vec![key("_entities"), index(0)]))
1107                    .build(),
1108            )
1109            .build();
1110
1111        let (_, errors) =
1112            node.response_at_path(&schema, &current_dir, inverted_paths, response, false);
1113
1114        assert_eq!(errors.len(), 1);
1115        assert_eq!(errors[0].extension_code().as_deref(), Some("FORBIDDEN"));
1116        assert_eq!(
1117            errors[0].path.as_ref().unwrap(),
1118            &Path(vec![key("users"), index(0)])
1119        );
1120    }
1121
1122    #[test]
1123    fn entity_fetch_error_appends_remaining_path_after_index() {
1124        let schema = test_schema();
1125        let node = make_fetch_node(make_requires());
1126        let current_dir = Path(vec![key("data"), flatten()]);
1127        let inverted_paths = vec![vec![Path(vec![key("data"), index(0)])]];
1128        let response = graphql::Response::builder()
1129            .data(json!({"_entities": [null]}))
1130            .error(
1131                graphql::Error::builder()
1132                    .message("nested err")
1133                    .path(Path(vec![
1134                        key("_entities"),
1135                        index(0),
1136                        key("address"),
1137                        key("city"),
1138                    ]))
1139                    .build(),
1140            )
1141            .build();
1142
1143        let (_, errors) =
1144            node.response_at_path(&schema, &current_dir, inverted_paths, response, false);
1145
1146        assert_eq!(errors.len(), 1);
1147        assert_eq!(
1148            errors[0].path.as_ref().unwrap(),
1149            &Path(vec![key("data"), index(0), key("address"), key("city")])
1150        );
1151    }
1152
1153    #[test]
1154    fn entity_fetch_missing_entities_key_with_errors() {
1155        let schema = test_schema();
1156        let node = make_fetch_node(make_requires());
1157        let current_dir = Path(vec![key("users"), flatten()]);
1158        let response = graphql::Response::builder()
1159            .data(json!({"something": "else"}))
1160            .error(
1161                graphql::Error::builder()
1162                    .message("permission denied")
1163                    .build(),
1164            )
1165            .build();
1166
1167        let (value, errors) = node.response_at_path(&schema, &current_dir, vec![], response, false);
1168
1169        assert_eq!(value, Value::Null);
1170        assert_eq!(errors.len(), 1);
1171        assert_eq!(errors[0].message, "permission denied");
1172    }
1173
1174    #[test]
1175    fn entity_fetch_missing_entities_key_no_errors() {
1176        let schema = test_schema();
1177        let node = make_fetch_node(make_requires());
1178        let current_dir = Path(vec![key("users")]);
1179        let response = graphql::Response::builder()
1180            .data(json!({"something": "else"}))
1181            .build();
1182
1183        let (value, errors) = node.response_at_path(&schema, &current_dir, vec![], response, false);
1184
1185        assert_eq!(value, Value::Null);
1186        assert!(errors.is_empty());
1187    }
1188
1189    #[test]
1190    fn entity_fetch_null_data_returns_null_with_errors() {
1191        let schema = test_schema();
1192        let node = make_fetch_node(make_requires());
1193        let current_dir = Path(vec![key("field")]);
1194        let response = graphql::Response::builder()
1195            .error(graphql::Error::builder().message("subgraph error").build())
1196            .build();
1197
1198        let (value, errors) = node.response_at_path(&schema, &current_dir, vec![], response, false);
1199
1200        assert_eq!(value, Value::Null);
1201        assert_eq!(errors.len(), 1);
1202    }
1203
1204    #[test]
1205    fn entity_fetch_null_data_errors_get_fallback_dir() {
1206        let schema = test_schema();
1207        let node = make_fetch_node(make_requires());
1208        let current_dir = Path(vec![key("users"), flatten(), key("reviews")]);
1209        let expected_fallback = Path(vec![key("users")]);
1210        let response = graphql::Response::builder()
1211            .error(graphql::Error::builder().message("pathless error").build())
1212            .error(
1213                graphql::Error::builder()
1214                    .message("non-entities path")
1215                    .path(Path(vec![key("something")]))
1216                    .build(),
1217            )
1218            .error(
1219                graphql::Error::builder()
1220                    .message("entities no index")
1221                    .path(Path(vec![key("_entities")]))
1222                    .build(),
1223            )
1224            .build();
1225
1226        let (value, errors) = node.response_at_path(&schema, &current_dir, vec![], response, true);
1227
1228        assert_eq!(value, Value::Null);
1229        assert_eq!(errors.len(), 3);
1230        for error in &errors {
1231            assert_eq!(
1232                error.path.as_ref().unwrap(),
1233                &expected_fallback,
1234                "error '{}' did not get fallback_dir",
1235                error.message,
1236            );
1237        }
1238    }
1239
1240    #[test]
1241    fn entity_fetch_missing_entities_key_errors_get_fallback_dir() {
1242        let schema = test_schema();
1243        let node = make_fetch_node(make_requires());
1244        let current_dir = Path(vec![key("items"), flatten()]);
1245        let expected_fallback = Path(vec![key("items")]);
1246        let response = graphql::Response::builder()
1247            .data(json!({"something": "else"}))
1248            .error(
1249                graphql::Error::builder()
1250                    .message("permission denied")
1251                    .build(),
1252            )
1253            .error(
1254                graphql::Error::builder()
1255                    .message("other error")
1256                    .path(Path(vec![key("unrelated")]))
1257                    .build(),
1258            )
1259            .build();
1260
1261        let (value, errors) = node.response_at_path(&schema, &current_dir, vec![], response, true);
1262
1263        assert_eq!(value, Value::Null);
1264        assert_eq!(errors.len(), 2);
1265        for error in &errors {
1266            assert_eq!(
1267                error.path.as_ref().unwrap(),
1268                &expected_fallback,
1269                "error '{}' did not get fallback_dir",
1270                error.message,
1271            );
1272        }
1273    }
1274
1275    #[test]
1276    fn entity_fetch_entities_not_array_returns_null() {
1277        let schema = test_schema();
1278        let node = make_fetch_node(make_requires());
1279        let current_dir = Path(vec![key("field")]);
1280        let response = graphql::Response::builder()
1281            .data(json!({"_entities": "not_an_array"}))
1282            .build();
1283
1284        let (value, errors) = node.response_at_path(&schema, &current_dir, vec![], response, false);
1285
1286        assert_eq!(value, Value::Null);
1287        assert!(errors.is_empty());
1288    }
1289
1290    #[test]
1291    fn entity_fetch_entities_not_array_errors_get_fallback_dir() {
1292        let schema = test_schema();
1293        let node = make_fetch_node(make_requires());
1294        let current_dir = Path(vec![key("products"), flatten()]);
1295        let expected_fallback = Path(vec![key("products")]);
1296        let response = graphql::Response::builder()
1297            .data(json!({"_entities": 42}))
1298            .error(graphql::Error::builder().message("bad entities").build())
1299            .build();
1300
1301        let (value, errors) = node.response_at_path(&schema, &current_dir, vec![], response, true);
1302
1303        assert_eq!(value, Value::Null);
1304        assert_eq!(errors.len(), 1);
1305        assert_eq!(errors[0].path.as_ref().unwrap(), &expected_fallback);
1306    }
1307
1308    #[test]
1309    fn entity_fetch_data_is_non_object_returns_null_with_fallback_errors() {
1310        let schema = test_schema();
1311        let node = make_fetch_node(make_requires());
1312        let current_dir = Path(vec![key("orders"), flatten(), key("items")]);
1313        let expected_fallback = Path(vec![key("orders")]);
1314        let response = graphql::Response {
1315            data: Some(Value::Null),
1316            errors: vec![graphql::Error::builder().message("null data error").build()],
1317            ..Default::default()
1318        };
1319
1320        let (value, errors) = node.response_at_path(&schema, &current_dir, vec![], response, true);
1321
1322        assert_eq!(value, Value::Null);
1323        assert_eq!(errors.len(), 1);
1324        assert_eq!(errors[0].path.as_ref().unwrap(), &expected_fallback);
1325    }
1326
1327    #[test]
1328    fn entity_fetch_mixed_error_types() {
1329        let schema = test_schema();
1330        let node = make_fetch_node(make_requires());
1331        let current_dir = Path(vec![key("users"), flatten()]);
1332        let inverted_paths = vec![
1333            vec![Path(vec![key("users"), index(0)])],
1334            vec![Path(vec![key("users"), index(1)])],
1335        ];
1336        let response = graphql::Response::builder()
1337            .data(json!({"_entities": [{"name": "Alice"}, null]}))
1338            .error(
1339                graphql::Error::builder()
1340                    .message("entity 1 error")
1341                    .path(Path(vec![key("_entities"), index(1), key("field")]))
1342                    .build(),
1343            )
1344            .error(
1345                graphql::Error::builder()
1346                    .message("general error")
1347                    .path(Path(vec![key("other")]))
1348                    .build(),
1349            )
1350            .error(graphql::Error::builder().message("pathless").build())
1351            .build();
1352
1353        let (_, errors) =
1354            node.response_at_path(&schema, &current_dir, inverted_paths, response, true);
1355
1356        assert_eq!(errors.len(), 3);
1357        assert_eq!(
1358            errors[0].path.as_ref().unwrap(),
1359            &Path(vec![key("users"), index(1), key("field")])
1360        );
1361        assert_eq!(errors[1].path.as_ref().unwrap(), &Path(vec![key("users")]));
1362        assert_eq!(errors[2].path.as_ref().unwrap(), &Path(vec![key("users")]));
1363    }
1364}