Skip to main content

hive_router_plan_executor/execution/
plan.rs

1use std::collections::{BTreeSet, HashMap};
2
3use bytes::{BufMut, Bytes};
4use futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt};
5use hive_router_internal::telemetry::traces::spans::graphql::{
6    GraphQLOperationSpan, GraphQLSpanOperationIdentity, GraphQLSubgraphOperationSpan,
7};
8use hive_router_query_planner::{
9    planner::plan_nodes::{
10        ConditionNode, FetchNode, FetchRewrite, FlattenNode, FlattenNodePath, PlanNode, QueryPlan,
11    },
12    state::supergraph_state::OperationKind,
13};
14use http::HeaderMap;
15use sonic_rs::ValueRef;
16use tracing::Instrument;
17
18use crate::{
19    context::ExecutionContext,
20    execution::{
21        client_request_details::ClientRequestDetails,
22        error::{IntoPlanExecutionError, LazyPlanContext, PlanExecutionError},
23        jwt_forward::JwtAuthForwardingPlan,
24        rewrites::FetchRewriteExt,
25    },
26    executors::{common::SubgraphExecutionRequest, map::SubgraphExecutorMap},
27    headers::{
28        plan::HeaderRulesPlan,
29        request::modify_subgraph_request_headers,
30        response::{apply_subgraph_response_headers, modify_client_response_headers},
31    },
32    introspection::{
33        resolve::{resolve_introspection, IntrospectionContext},
34        schema::SchemaMetadata,
35    },
36    projection::{
37        plan::FieldProjectionPlan,
38        request::{project_requires, RequestProjectionContext},
39        response::project_by_operation,
40    },
41    response::{
42        graphql_error::{GraphQLError, GraphQLErrorPath},
43        merge::deep_merge,
44        subgraph_response::SubgraphResponse,
45        value::Value,
46    },
47    utils::{
48        consts::{CLOSE_BRACKET, OPEN_BRACKET},
49        traverse::{traverse_and_callback, traverse_and_callback_mut},
50    },
51};
52
53pub struct QueryPlanExecutionContext<'exec> {
54    pub query_plan: &'exec QueryPlan,
55    pub projection_plan: &'exec [FieldProjectionPlan],
56    pub headers_plan: &'exec HeaderRulesPlan,
57    pub variable_values: &'exec Option<HashMap<String, sonic_rs::Value>>,
58    pub extensions: Option<HashMap<String, sonic_rs::Value>>,
59    pub client_request: &'exec ClientRequestDetails<'exec>,
60    pub introspection_context: &'exec IntrospectionContext<'exec, 'static>,
61    pub operation_type_name: &'exec str,
62    pub executors: &'exec SubgraphExecutorMap,
63    pub jwt_auth_forwarding: &'exec Option<JwtAuthForwardingPlan>,
64    pub initial_errors: Vec<GraphQLError>,
65    pub span: &'exec GraphQLOperationSpan,
66}
67
68pub struct PlanExecutionOutput {
69    pub body: Vec<u8>,
70    pub headers: HeaderMap,
71    pub error_count: usize,
72}
73
74pub async fn execute_query_plan<'exec>(
75    ctx: QueryPlanExecutionContext<'exec>,
76) -> Result<PlanExecutionOutput, PlanExecutionError> {
77    let init_value = if let Some(introspection_query) = ctx.introspection_context.query {
78        resolve_introspection(introspection_query, ctx.introspection_context)
79    } else if ctx.projection_plan.is_empty() {
80        Value::Null
81    } else {
82        Value::Object(Vec::new())
83    };
84
85    let mut exec_ctx = ExecutionContext::new(ctx.query_plan, init_value, ctx.initial_errors);
86    let executor = Executor::new(
87        ctx.variable_values,
88        ctx.executors,
89        ctx.introspection_context.metadata,
90        ctx.client_request,
91        ctx.headers_plan,
92        ctx.jwt_auth_forwarding,
93        // Deduplicate subgraph requests only if the operation type is a query
94        ctx.operation_type_name == "Query",
95    );
96
97    if let Some(node) = &ctx.query_plan.node {
98        executor.execute_plan_node(&mut exec_ctx, node).await?;
99    }
100
101    let mut response_headers = HeaderMap::new();
102    modify_client_response_headers(exec_ctx.response_headers_aggregator, &mut response_headers)
103        .with_plan_context(LazyPlanContext {
104            subgraph_name: || None,
105            affected_path: || None,
106        })?;
107
108    let final_response = &exec_ctx.final_response;
109    let error_count = exec_ctx.errors.len();
110    if error_count > 0 {
111        ctx.span.record_error_count(error_count);
112        ctx.span
113            .record_errors(|| exec_ctx.errors.iter().map(|e| e.into()).collect());
114    }
115
116    let body = project_by_operation(
117        final_response,
118        exec_ctx.errors,
119        &ctx.extensions,
120        ctx.operation_type_name,
121        ctx.projection_plan,
122        ctx.variable_values,
123        exec_ctx.response_storage.estimate_final_response_size(),
124        ctx.introspection_context.metadata,
125    )
126    .with_plan_context(LazyPlanContext {
127        subgraph_name: || None,
128        affected_path: || None,
129    })?;
130
131    Ok(PlanExecutionOutput {
132        body,
133        headers: response_headers,
134        error_count,
135    })
136}
137
138pub struct Executor<'exec> {
139    variable_values: &'exec Option<HashMap<String, sonic_rs::Value>>,
140    schema_metadata: &'exec SchemaMetadata,
141    executors: &'exec SubgraphExecutorMap,
142    client_request: &'exec ClientRequestDetails<'exec>,
143    headers_plan: &'exec HeaderRulesPlan,
144    jwt_forwarding_plan: &'exec Option<JwtAuthForwardingPlan>,
145    dedupe_subgraph_requests: bool,
146}
147
148struct ConcurrencyScope<'exec, T> {
149    jobs: FuturesUnordered<BoxFuture<'exec, T>>,
150}
151
152impl<'exec, T> ConcurrencyScope<'exec, T> {
153    fn new() -> Self {
154        Self {
155            jobs: FuturesUnordered::new(),
156        }
157    }
158
159    fn spawn(&mut self, future: BoxFuture<'exec, T>) {
160        self.jobs.push(future);
161    }
162
163    async fn join_all(mut self) -> Vec<T> {
164        let mut results = Vec::with_capacity(self.jobs.len());
165        while let Some(result) = self.jobs.next().await {
166            results.push(result);
167        }
168        results
169    }
170}
171
172struct FetchJob<'exec> {
173    fetch_node_id: i64,
174    subgraph_name: &'exec str,
175    response: SubgraphResponse<'exec>,
176}
177
178struct FlattenFetchJob<'exec> {
179    flatten_node_path: &'exec FlattenNodePath,
180    response: SubgraphResponse<'exec>,
181    fetch_node_id: i64,
182    subgraph_name: &'exec str,
183    representation_hashes: Vec<u64>,
184    representation_hash_to_index: HashMap<u64, usize>,
185}
186
187enum ExecutionJob<'exec> {
188    Fetch(FetchJob<'exec>),
189    FlattenFetch(FlattenFetchJob<'exec>),
190}
191
192impl<'exec> Executor<'exec> {
193    pub fn new(
194        variable_values: &'exec Option<HashMap<String, sonic_rs::Value>>,
195        executors: &'exec SubgraphExecutorMap,
196        schema_metadata: &'exec SchemaMetadata,
197        client_request: &'exec ClientRequestDetails<'exec>,
198        headers_plan: &'exec HeaderRulesPlan,
199        jwt_forwarding_plan: &'exec Option<JwtAuthForwardingPlan>,
200        dedupe_subgraph_requests: bool,
201    ) -> Self {
202        Executor {
203            variable_values,
204            executors,
205            schema_metadata,
206            client_request,
207            headers_plan,
208            dedupe_subgraph_requests,
209            jwt_forwarding_plan,
210        }
211    }
212
213    pub async fn execute_plan_node(
214        &'exec self,
215        ctx: &mut ExecutionContext<'exec>,
216        plan: &'exec PlanNode,
217    ) -> Result<(), PlanExecutionError> {
218        match plan {
219            PlanNode::Fetch(node) => match self.execute_fetch_node(node, None).await {
220                Ok(result) => self.process_job_result(ctx, result),
221                Err(err) => {
222                    self.log_error(&err);
223                    ctx.errors.push(err.into());
224                    Ok(())
225                }
226            },
227            PlanNode::Parallel(node) => {
228                let mut scope = ConcurrencyScope::new();
229
230                for child in &node.nodes {
231                    let job_future = self.prepare_job_future(child, &ctx.final_response);
232                    scope.spawn(job_future);
233                }
234
235                let results = scope.join_all().await;
236
237                for result in results {
238                    match result {
239                        Ok(Some(job)) => {
240                            self.process_job_result(ctx, job)?;
241                        }
242                        Ok(None) => { /* do nothing */ }
243                        Err(err) => {
244                            self.log_error(&err);
245                            ctx.errors.push(err.into())
246                        }
247                    }
248                }
249
250                Ok(())
251            }
252            PlanNode::Sequence(node) => {
253                for child in &node.nodes {
254                    // Box::pin the future for recursive calls to have the correct lifetime
255                    // self.execute_plan_node can call back into execute_sequence_wave
256                    Box::pin(self.execute_plan_node(ctx, child)).await?;
257                }
258
259                Ok(())
260            }
261            PlanNode::Flatten(node) => {
262                match self
263                    .execute_flatten_fetch_node(node, &ctx.final_response)
264                    .await
265                {
266                    Ok(Some(result)) => self.process_job_result(ctx, result),
267                    Ok(None) => Ok(()),
268                    Err(err) => {
269                        self.log_error(&err);
270                        ctx.errors.push(err.into());
271                        Ok(())
272                    }
273                }
274            }
275            PlanNode::Condition(node) => {
276                let Some(node) = condition_node_by_variables(node, self.variable_values) else {
277                    return Ok(());
278                };
279                // Box::pin the future for recursive calls to have the correct lifetime
280                Box::pin(self.execute_plan_node(ctx, node)).await
281            }
282            // Plans produced by our Query Planner can only start with: Fetch, Sequence or Parallel.
283            // Any other node type at the root is not supported, do nothing
284            _ => Ok(()),
285        }
286    }
287
288    fn prepare_job_future<'wave>(
289        &'exec self,
290        node: &'exec PlanNode,
291        final_response: &'wave Value<'exec>,
292    ) -> BoxFuture<'wave, Result<Option<ExecutionJob<'exec>>, PlanExecutionError>> {
293        Box::pin(async move {
294            match node {
295                PlanNode::Fetch(fetch_node) => {
296                    Ok(Some(self.execute_fetch_node(fetch_node, None).await?))
297                }
298                PlanNode::Flatten(flatten_node) => Ok(self
299                    .execute_flatten_fetch_node(flatten_node, final_response)
300                    .await?),
301                PlanNode::Condition(node) => {
302                    let Some(node) = condition_node_by_variables(node, self.variable_values) else {
303                        return Ok(None);
304                    };
305                    self.prepare_job_future(node, final_response).await
306                }
307                // Our Query Planner does not produce any other plan node types in ParallelNode
308                _ => Ok(None),
309            }
310        })
311    }
312
313    fn process_subgraph_response(
314        &self,
315        ctx: &mut ExecutionContext<'exec>,
316        response_bytes: Option<Bytes>,
317        fetch_node_id: i64,
318    ) -> Option<&'exec [FetchRewrite]> {
319        if let Some(response_bytes) = response_bytes {
320            ctx.response_storage.add_response(response_bytes);
321        }
322
323        ctx.output_rewrites.get(fetch_node_id)
324    }
325
326    fn process_job_result(
327        &self,
328        ctx: &mut ExecutionContext<'exec>,
329        job: ExecutionJob<'exec>,
330    ) -> Result<(), PlanExecutionError> {
331        match job {
332            ExecutionJob::Fetch(mut job) => {
333                if let Some(response_headers) = &job.response.headers {
334                    apply_subgraph_response_headers(
335                        self.headers_plan,
336                        job.subgraph_name,
337                        response_headers,
338                        self.client_request,
339                        &mut ctx.response_headers_aggregator,
340                    )
341                    .with_plan_context(LazyPlanContext {
342                        subgraph_name: || Some(job.subgraph_name.into()),
343                        affected_path: || None,
344                    })?;
345                }
346
347                if let Some(output_rewrites) =
348                    self.process_subgraph_response(ctx, job.response.bytes, job.fetch_node_id)
349                {
350                    for output_rewrite in output_rewrites {
351                        output_rewrite
352                            .rewrite(&self.schema_metadata.possible_types, &mut job.response.data);
353                    }
354                }
355
356                ctx.handle_errors(job.subgraph_name, None, job.response.errors, None);
357
358                deep_merge(&mut ctx.final_response, job.response.data);
359            }
360            ExecutionJob::FlattenFetch(mut job) => {
361                if let Some(response_headers) = &job.response.headers {
362                    apply_subgraph_response_headers(
363                        self.headers_plan,
364                        job.subgraph_name,
365                        response_headers,
366                        self.client_request,
367                        &mut ctx.response_headers_aggregator,
368                    )
369                    .with_plan_context(LazyPlanContext {
370                        subgraph_name: || Some(job.subgraph_name.into()),
371                        affected_path: || None,
372                    })?;
373                }
374
375                let output_rewrites =
376                    self.process_subgraph_response(ctx, job.response.bytes, job.fetch_node_id);
377
378                let mut entity_index_error_map: Option<HashMap<&usize, Vec<GraphQLErrorPath>>> =
379                    None;
380
381                if let Some(mut entities) = job.response.data.take_entities() {
382                    if let Some(output_rewrites) = output_rewrites {
383                        for output_rewrite in output_rewrites {
384                            for entity in &mut entities {
385                                output_rewrite
386                                    .rewrite(&self.schema_metadata.possible_types, entity);
387                            }
388                        }
389                    }
390
391                    let mut index = 0;
392                    let normalized_path = job.flatten_node_path.as_slice();
393                    // If there is an error in the response, then collect the paths for normalizing the error
394                    let initial_error_path = job
395                        .response
396                        .errors
397                        .as_ref()
398                        .map(|_| GraphQLErrorPath::with_capacity(normalized_path.len() + 2));
399                    entity_index_error_map = job
400                        .response
401                        .errors
402                        .as_ref()
403                        .map(|_| HashMap::with_capacity(entities.len()));
404                    traverse_and_callback_mut(
405                        &mut ctx.final_response,
406                        normalized_path,
407                        self.schema_metadata,
408                        initial_error_path,
409                        &mut |target, error_path| {
410                            let hash = job.representation_hashes[index];
411                            if let Some(entity_index) = job.representation_hash_to_index.get(&hash)
412                            {
413                                if let (Some(error_path), Some(entity_index_error_map)) =
414                                    (error_path, entity_index_error_map.as_mut())
415                                {
416                                    let error_paths = entity_index_error_map
417                                        .entry(entity_index)
418                                        .or_insert_with(Vec::new);
419                                    error_paths.push(error_path);
420                                }
421                                if let Some(entity) = entities.get(*entity_index) {
422                                    deep_merge(target, entity.clone());
423                                }
424                            }
425                            index += 1;
426                        },
427                    );
428                }
429                ctx.handle_errors(
430                    job.subgraph_name,
431                    Some(job.flatten_node_path),
432                    job.response.errors,
433                    entity_index_error_map,
434                );
435            }
436        }
437        Ok(())
438    }
439
440    async fn execute_flatten_fetch_node<'wave>(
441        &'exec self,
442        flatten_node: &'exec FlattenNode,
443        final_response: &'wave Value<'exec>,
444    ) -> Result<Option<ExecutionJob<'exec>>, PlanExecutionError> {
445        let PlanNode::Fetch(fetch_node) = flatten_node.node.as_ref() else {
446            return Ok(None);
447        };
448
449        let requires_nodes = match fetch_node.requires.as_ref() {
450            Some(nodes) => nodes,
451            None => return Ok(None),
452        };
453
454        let mut index = 0;
455        let normalized_path = flatten_node.path.as_slice();
456        let mut filtered_representations = Vec::new();
457        filtered_representations.put(OPEN_BRACKET);
458        let proj_ctx = RequestProjectionContext::new(&self.schema_metadata.possible_types);
459        let mut representation_hashes: Vec<u64> = Vec::new();
460        let mut filtered_representations_hashes: HashMap<u64, usize> = HashMap::new();
461        let arena = bumpalo::Bump::new();
462
463        traverse_and_callback(
464            final_response,
465            normalized_path,
466            self.schema_metadata,
467            &mut |entity| {
468                let hash = entity.to_hash(&requires_nodes.items, proj_ctx.possible_types);
469
470                if !entity.is_null() {
471                    representation_hashes.push(hash);
472                }
473
474                if filtered_representations_hashes.contains_key(&hash) {
475                    return Ok::<(), PlanExecutionError>(());
476                }
477
478                let entity = if let Some(input_rewrites) = &fetch_node.input_rewrites {
479                    let new_entity = arena.alloc(entity.clone());
480                    for input_rewrite in input_rewrites {
481                        input_rewrite.rewrite(&self.schema_metadata.possible_types, new_entity);
482                    }
483                    new_entity
484                } else {
485                    entity
486                };
487
488                let is_projected = project_requires(
489                    &proj_ctx,
490                    &requires_nodes.items,
491                    entity,
492                    &mut filtered_representations,
493                    filtered_representations_hashes.is_empty(),
494                    None,
495                )
496                .with_plan_context(LazyPlanContext {
497                    subgraph_name: || Some(fetch_node.service_name.clone()),
498                    affected_path: || Some(flatten_node.path.to_string()),
499                })?;
500
501                if is_projected {
502                    filtered_representations_hashes.insert(hash, index);
503                }
504
505                index += 1;
506
507                Ok(())
508            },
509        )?;
510        filtered_representations.put(CLOSE_BRACKET);
511
512        if filtered_representations_hashes.is_empty() {
513            return Ok(None);
514        }
515
516        let ExecutionJob::Fetch(job) = self
517            .execute_fetch_node(fetch_node, Some(filtered_representations))
518            .await?
519        else {
520            return Ok(None);
521        };
522
523        Ok(Some(ExecutionJob::FlattenFetch(FlattenFetchJob {
524            flatten_node_path: &flatten_node.path,
525            response: job.response,
526            fetch_node_id: fetch_node.id,
527            subgraph_name: &fetch_node.service_name,
528            representation_hashes,
529            representation_hash_to_index: filtered_representations_hashes,
530        })))
531    }
532
533    async fn execute_fetch_node(
534        &'exec self,
535        node: &'exec FetchNode,
536        representations: Option<Vec<u8>>,
537    ) -> Result<ExecutionJob<'exec>, PlanExecutionError> {
538        let subgraph_operation_span = GraphQLSubgraphOperationSpan::new(
539            node.service_name.as_str(),
540            &node.operation.document_str,
541        );
542
543        async {
544            // TODO: We could optimize header map creation by caching them per service name
545            let mut headers_map = HeaderMap::new();
546            modify_subgraph_request_headers(
547                self.headers_plan,
548                &node.service_name,
549                self.client_request,
550                &mut headers_map,
551            )
552            .with_plan_context(LazyPlanContext {
553                subgraph_name: || Some(node.service_name.clone()),
554                affected_path: || None,
555            })?;
556            let variable_refs =
557                select_fetch_variables(self.variable_values, node.variable_usages.as_ref());
558
559            let mut subgraph_request = SubgraphExecutionRequest {
560                query: node.operation.document_str.as_str(),
561                dedupe: self.dedupe_subgraph_requests,
562                operation_name: node.operation_name.as_deref(),
563                variables: variable_refs,
564                representations,
565                headers: headers_map,
566                extensions: None,
567            };
568
569            subgraph_operation_span.record_operation_identity(GraphQLSpanOperationIdentity {
570                name: subgraph_request.operation_name,
571                operation_type: match node.operation_kind {
572                    Some(OperationKind::Query) | None => "query",
573                    Some(OperationKind::Mutation) => "mutation",
574                    Some(OperationKind::Subscription) => "subscription",
575                },
576                client_document_hash: node.operation.hash.to_string().as_str(),
577            });
578
579            if let Some(jwt_forwarding_plan) = &self.jwt_forwarding_plan {
580                subgraph_request.add_request_extensions_field(
581                    jwt_forwarding_plan.extension_field_name.clone(),
582                    jwt_forwarding_plan.extension_field_value.clone(),
583                );
584            }
585
586            let response = self
587                .executors
588                .execute(&node.service_name, subgraph_request, self.client_request)
589                .await
590                .with_plan_context(LazyPlanContext {
591                    subgraph_name: || Some(node.service_name.clone()),
592                    affected_path: || None,
593                })?;
594
595            if let Some(errors) = &response.errors {
596                if !errors.is_empty() {
597                    subgraph_operation_span.record_error_count(errors.len());
598                    subgraph_operation_span
599                        .record_errors(|| errors.iter().map(|e| e.into()).collect());
600                }
601            }
602
603            Ok(ExecutionJob::Fetch(FetchJob {
604                fetch_node_id: node.id,
605                subgraph_name: &node.service_name,
606                response,
607            }))
608        }
609        .instrument(subgraph_operation_span.clone())
610        .await
611    }
612
613    fn log_error(&self, error: &PlanExecutionError) {
614        tracing::error!(
615            subgraph_name = error.subgraph_name(),
616            error = error as &dyn std::error::Error,
617            "Plan execution error"
618        );
619    }
620}
621
622fn condition_node_by_variables<'a>(
623    condition_node: &'a ConditionNode,
624    variable_values: &'a Option<HashMap<String, sonic_rs::Value>>,
625) -> Option<&'a PlanNode> {
626    let vars = variable_values.as_ref()?;
627    let value = vars.get(&condition_node.condition)?;
628    let condition_met = matches!(value.as_ref(), ValueRef::Bool(true));
629
630    if condition_met {
631        condition_node.if_clause.as_deref()
632    } else {
633        condition_node.else_clause.as_deref()
634    }
635}
636
637fn select_fetch_variables<'a>(
638    variable_values: &'a Option<HashMap<String, sonic_rs::Value>>,
639    variable_usages: Option<&BTreeSet<String>>,
640) -> Option<HashMap<&'a str, &'a sonic_rs::Value>> {
641    let values = variable_values.as_ref()?;
642
643    variable_usages.map(|variable_usages| {
644        variable_usages
645            .iter()
646            .filter_map(|var_name| {
647                values
648                    .get_key_value(var_name.as_str())
649                    .map(|(key, value)| (key.as_str(), value))
650            })
651            .collect()
652    })
653}
654
655#[cfg(test)]
656mod tests {
657    use crate::{
658        context::ExecutionContext,
659        response::graphql_error::{GraphQLErrorExtensions, GraphQLErrorPath},
660    };
661
662    use super::select_fetch_variables;
663    use sonic_rs::Value;
664    use std::collections::{BTreeSet, HashMap};
665
666    fn value_from_number(n: i32) -> Value {
667        sonic_rs::from_str(&n.to_string()).unwrap()
668    }
669
670    #[test]
671    fn select_fetch_variables_only_used_variables() {
672        let mut variable_values_map = HashMap::new();
673        variable_values_map.insert("used".to_string(), value_from_number(1));
674        variable_values_map.insert("unused".to_string(), value_from_number(2));
675        let variable_values = Some(variable_values_map);
676
677        let mut usages = BTreeSet::new();
678        usages.insert("used".to_string());
679
680        let selected = select_fetch_variables(&variable_values, Some(&usages)).unwrap();
681
682        assert_eq!(selected.len(), 1);
683        assert!(selected.contains_key("used"));
684        assert!(!selected.contains_key("unused"));
685    }
686
687    #[test]
688    fn select_fetch_variables_ignores_missing_usage_entries() {
689        let mut variable_values_map = HashMap::new();
690        variable_values_map.insert("present".to_string(), value_from_number(3));
691        let variable_values = Some(variable_values_map);
692
693        let mut usages = BTreeSet::new();
694        usages.insert("present".to_string());
695        usages.insert("missing".to_string());
696
697        let selected = select_fetch_variables(&variable_values, Some(&usages)).unwrap();
698
699        assert_eq!(selected.len(), 1);
700        assert!(selected.contains_key("present"));
701        assert!(!selected.contains_key("missing"));
702    }
703
704    #[test]
705    fn select_fetch_variables_for_no_usage_entries() {
706        let mut variable_values_map = HashMap::new();
707        variable_values_map.insert("unused_1".to_string(), value_from_number(1));
708        variable_values_map.insert("unused_2".to_string(), value_from_number(2));
709
710        let variable_values = Some(variable_values_map);
711
712        let selected = select_fetch_variables(&variable_values, None);
713
714        assert!(selected.is_none());
715    }
716    #[test]
717    /**
718     * We have the same entity in two different paths ["a", 0] and ["b", 1],
719     * and the subgraph response has an error for this entity.
720     * So we should duplicate the error for both paths.
721     */
722    fn normalize_entity_errors_correctly() {
723        use crate::response::graphql_error::{GraphQLError, GraphQLErrorPathSegment};
724        use std::collections::HashMap;
725        let mut ctx = ExecutionContext::default();
726        let mut entity_index_error_map: HashMap<&usize, Vec<GraphQLErrorPath>> = HashMap::new();
727        entity_index_error_map.insert(
728            &0,
729            vec![
730                GraphQLErrorPath {
731                    segments: vec![
732                        GraphQLErrorPathSegment::String("a".to_string()),
733                        GraphQLErrorPathSegment::Index(0),
734                    ],
735                },
736                GraphQLErrorPath {
737                    segments: vec![
738                        GraphQLErrorPathSegment::String("b".to_string()),
739                        GraphQLErrorPathSegment::Index(1),
740                    ],
741                },
742            ],
743        );
744        let response_errors = vec![GraphQLError {
745            message: "Error 1".to_string(),
746            locations: None,
747            path: Some(GraphQLErrorPath {
748                segments: vec![
749                    GraphQLErrorPathSegment::String("_entities".to_string()),
750                    GraphQLErrorPathSegment::Index(0),
751                    GraphQLErrorPathSegment::String("field1".to_string()),
752                ],
753            }),
754            extensions: GraphQLErrorExtensions::default(),
755        }];
756        ctx.handle_errors(
757            "subgraph_a",
758            None,
759            Some(response_errors),
760            Some(entity_index_error_map),
761        );
762        assert_eq!(ctx.errors.len(), 2);
763        assert_eq!(ctx.errors[0].message, "Error 1");
764        assert_eq!(
765            ctx.errors[0].path.as_ref().unwrap().segments,
766            vec![
767                GraphQLErrorPathSegment::String("a".to_string()),
768                GraphQLErrorPathSegment::Index(0),
769                GraphQLErrorPathSegment::String("field1".to_string())
770            ]
771        );
772        assert_eq!(ctx.errors[1].message, "Error 1");
773        assert_eq!(
774            ctx.errors[1].path.as_ref().unwrap().segments,
775            vec![
776                GraphQLErrorPathSegment::String("b".to_string()),
777                GraphQLErrorPathSegment::Index(1),
778                GraphQLErrorPathSegment::String("field1".to_string())
779            ]
780        );
781    }
782}