hive_router_plan_executor/execution/
plan.rs

1use std::collections::{BTreeSet, HashMap};
2
3use bytes::{BufMut, Bytes};
4use futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt};
5use hive_router_query_planner::planner::plan_nodes::{
6    ConditionNode, FetchNode, FetchRewrite, FlattenNode, FlattenNodePath, ParallelNode, PlanNode,
7    QueryPlan, SequenceNode,
8};
9use http::HeaderMap;
10use sonic_rs::ValueRef;
11
12use crate::{
13    context::ExecutionContext,
14    execution::{
15        client_request_details::ClientRequestDetails,
16        error::{IntoPlanExecutionError, LazyPlanContext, PlanExecutionError},
17        jwt_forward::JwtAuthForwardingPlan,
18        rewrites::FetchRewriteExt,
19    },
20    executors::{common::SubgraphExecutionRequest, map::SubgraphExecutorMap},
21    headers::{
22        plan::HeaderRulesPlan,
23        request::modify_subgraph_request_headers,
24        response::{apply_subgraph_response_headers, modify_client_response_headers},
25    },
26    introspection::{
27        resolve::{resolve_introspection, IntrospectionContext},
28        schema::SchemaMetadata,
29    },
30    projection::{
31        plan::FieldProjectionPlan,
32        request::{project_requires, RequestProjectionContext},
33        response::project_by_operation,
34    },
35    response::{
36        graphql_error::{GraphQLError, GraphQLErrorPath},
37        merge::deep_merge,
38        subgraph_response::SubgraphResponse,
39        value::Value,
40    },
41    utils::{
42        consts::{CLOSE_BRACKET, OPEN_BRACKET},
43        traverse::{traverse_and_callback, traverse_and_callback_mut},
44    },
45};
46
47pub struct QueryPlanExecutionContext<'exec> {
48    pub query_plan: &'exec QueryPlan,
49    pub projection_plan: &'exec [FieldProjectionPlan],
50    pub headers_plan: &'exec HeaderRulesPlan,
51    pub variable_values: &'exec Option<HashMap<String, sonic_rs::Value>>,
52    pub extensions: Option<HashMap<String, sonic_rs::Value>>,
53    pub client_request: &'exec ClientRequestDetails<'exec>,
54    pub introspection_context: &'exec IntrospectionContext<'exec, 'static>,
55    pub operation_type_name: &'exec str,
56    pub executors: &'exec SubgraphExecutorMap,
57    pub jwt_auth_forwarding: &'exec Option<JwtAuthForwardingPlan>,
58    pub initial_errors: Vec<GraphQLError>,
59}
60
61pub struct PlanExecutionOutput {
62    pub body: Vec<u8>,
63    pub headers: HeaderMap,
64    pub error_count: usize,
65}
66
67pub async fn execute_query_plan<'exec>(
68    ctx: QueryPlanExecutionContext<'exec>,
69) -> Result<PlanExecutionOutput, PlanExecutionError> {
70    let init_value = if let Some(introspection_query) = ctx.introspection_context.query {
71        resolve_introspection(introspection_query, ctx.introspection_context)
72    } else if ctx.projection_plan.is_empty() {
73        Value::Null
74    } else {
75        Value::Object(Vec::new())
76    };
77
78    let mut exec_ctx = ExecutionContext::new(ctx.query_plan, init_value, ctx.initial_errors);
79    let executor = Executor::new(
80        ctx.variable_values,
81        ctx.executors,
82        ctx.introspection_context.metadata,
83        ctx.client_request,
84        ctx.headers_plan,
85        ctx.jwt_auth_forwarding,
86        // Deduplicate subgraph requests only if the operation type is a query
87        ctx.operation_type_name == "Query",
88    );
89
90    if ctx.query_plan.node.is_some() {
91        executor
92            .execute(&mut exec_ctx, ctx.query_plan.node.as_ref())
93            .await?;
94    }
95
96    let mut response_headers = HeaderMap::new();
97    modify_client_response_headers(exec_ctx.response_headers_aggregator, &mut response_headers)
98        .with_plan_context(LazyPlanContext {
99            subgraph_name: || None,
100            affected_path: || None,
101        })?;
102
103    let final_response = &exec_ctx.final_response;
104    let error_count = exec_ctx.errors.len(); // Added for usage reporting
105    let body = project_by_operation(
106        final_response,
107        exec_ctx.errors,
108        &ctx.extensions,
109        ctx.operation_type_name,
110        ctx.projection_plan,
111        ctx.variable_values,
112        exec_ctx.response_storage.estimate_final_response_size(),
113        ctx.introspection_context.metadata,
114    )
115    .with_plan_context(LazyPlanContext {
116        subgraph_name: || None,
117        affected_path: || None,
118    })?;
119
120    Ok(PlanExecutionOutput {
121        body,
122        headers: response_headers,
123        error_count,
124    })
125}
126
127pub struct Executor<'exec> {
128    variable_values: &'exec Option<HashMap<String, sonic_rs::Value>>,
129    schema_metadata: &'exec SchemaMetadata,
130    executors: &'exec SubgraphExecutorMap,
131    client_request: &'exec ClientRequestDetails<'exec>,
132    headers_plan: &'exec HeaderRulesPlan,
133    jwt_forwarding_plan: &'exec Option<JwtAuthForwardingPlan>,
134    dedupe_subgraph_requests: bool,
135}
136
137struct ConcurrencyScope<'exec, T> {
138    jobs: FuturesUnordered<BoxFuture<'exec, T>>,
139}
140
141impl<'exec, T> ConcurrencyScope<'exec, T> {
142    fn new() -> Self {
143        Self {
144            jobs: FuturesUnordered::new(),
145        }
146    }
147
148    fn spawn(&mut self, future: BoxFuture<'exec, T>) {
149        self.jobs.push(future);
150    }
151
152    async fn join_all(mut self) -> Vec<T> {
153        let mut results = Vec::with_capacity(self.jobs.len());
154        while let Some(result) = self.jobs.next().await {
155            results.push(result);
156        }
157        results
158    }
159}
160
161struct FetchJob<'exec> {
162    fetch_node_id: i64,
163    subgraph_name: &'exec str,
164    response: SubgraphResponse<'exec>,
165}
166
167struct FlattenFetchJob<'exec> {
168    flatten_node_path: &'exec FlattenNodePath,
169    response: SubgraphResponse<'exec>,
170    fetch_node_id: i64,
171    subgraph_name: &'exec str,
172    representation_hashes: Vec<u64>,
173    representation_hash_to_index: HashMap<u64, usize>,
174}
175
176enum ExecutionJob<'exec> {
177    Fetch(FetchJob<'exec>),
178    FlattenFetch(FlattenFetchJob<'exec>),
179    None,
180}
181
182struct PreparedFlattenData {
183    representations: Vec<u8>,
184    representation_hashes: Vec<u64>,
185    representation_hash_to_index: HashMap<u64, usize>,
186}
187
188impl<'exec> Executor<'exec> {
189    pub fn new(
190        variable_values: &'exec Option<HashMap<String, sonic_rs::Value>>,
191        executors: &'exec SubgraphExecutorMap,
192        schema_metadata: &'exec SchemaMetadata,
193        client_request: &'exec ClientRequestDetails<'exec>,
194        headers_plan: &'exec HeaderRulesPlan,
195        jwt_forwarding_plan: &'exec Option<JwtAuthForwardingPlan>,
196        dedupe_subgraph_requests: bool,
197    ) -> Self {
198        Executor {
199            variable_values,
200            executors,
201            schema_metadata,
202            client_request,
203            headers_plan,
204            dedupe_subgraph_requests,
205            jwt_forwarding_plan,
206        }
207    }
208
209    pub async fn execute(
210        &'exec self,
211        ctx: &mut ExecutionContext<'exec>,
212        plan: Option<&'exec PlanNode>,
213    ) -> Result<(), PlanExecutionError> {
214        match plan {
215            Some(PlanNode::Fetch(node)) => self.execute_fetch_wave(ctx, node).await,
216            Some(PlanNode::Parallel(node)) => self.execute_parallel_wave(ctx, node).await,
217            Some(PlanNode::Sequence(node)) => self.execute_sequence_wave(ctx, node).await,
218            // Plans produced by our Query Planner can only start with: Fetch, Sequence or Parallel.
219            // Any other node type at the root is not supported, do nothing
220            Some(_) => Ok(()),
221            // An empty plan is valid, just do nothing
222            None => Ok(()),
223        }
224    }
225
226    async fn execute_fetch_wave(
227        &'exec self,
228        ctx: &mut ExecutionContext<'exec>,
229        node: &'exec FetchNode,
230    ) -> Result<(), PlanExecutionError> {
231        match self.execute_fetch_node(node, None).await {
232            Ok(result) => self.process_job_result(ctx, result),
233            Err(err) => {
234                self.log_error(&err);
235                ctx.errors.push(err.into());
236                Ok(())
237            }
238        }
239    }
240
241    async fn execute_sequence_wave(
242        &'exec self,
243        ctx: &mut ExecutionContext<'exec>,
244        node: &'exec SequenceNode,
245    ) -> Result<(), PlanExecutionError> {
246        for child in &node.nodes {
247            Box::pin(self.execute_plan_node(ctx, child)).await?;
248        }
249
250        Ok(())
251    }
252
253    async fn execute_parallel_wave(
254        &'exec self,
255        ctx: &mut ExecutionContext<'exec>,
256        node: &'exec ParallelNode,
257    ) -> Result<(), PlanExecutionError> {
258        let mut scope = ConcurrencyScope::new();
259
260        for child in &node.nodes {
261            let job_future = self.prepare_job_future(child, &ctx.final_response);
262            scope.spawn(job_future);
263        }
264
265        let results = scope.join_all().await;
266
267        for result in results {
268            match result {
269                Ok(job) => {
270                    self.process_job_result(ctx, job)?;
271                }
272                Err(err) => {
273                    self.log_error(&err);
274                    ctx.errors.push(err.into())
275                }
276            }
277        }
278
279        Ok(())
280    }
281
282    async fn execute_plan_node(
283        &'exec self,
284        ctx: &mut ExecutionContext<'exec>,
285        node: &'exec PlanNode,
286    ) -> Result<(), PlanExecutionError> {
287        match node {
288            PlanNode::Fetch(fetch_node) => match self.execute_fetch_node(fetch_node, None).await {
289                Ok(job) => {
290                    self.process_job_result(ctx, job)?;
291                }
292                Err(err) => {
293                    self.log_error(&err);
294                    ctx.errors.push(err.into());
295                }
296            },
297            PlanNode::Parallel(parallel_node) => {
298                self.execute_parallel_wave(ctx, parallel_node).await?;
299            }
300            PlanNode::Flatten(flatten_node) => {
301                match self.prepare_flatten_data(&ctx.final_response, flatten_node) {
302                    Ok(Some(p)) => {
303                        match self
304                            .execute_flatten_fetch_node(
305                                flatten_node,
306                                Some(p.representations),
307                                Some(p.representation_hashes),
308                                Some(p.representation_hash_to_index),
309                            )
310                            .await
311                        {
312                            Ok(job) => {
313                                self.process_job_result(ctx, job)?;
314                            }
315                            Err(err) => {
316                                self.log_error(&err);
317                                ctx.errors.push(err.into());
318                            }
319                        }
320                    }
321                    Ok(None) => { /* do nothing */ }
322                    Err(err) => {
323                        self.log_error(&err);
324                        ctx.errors.push(err.into());
325                    }
326                }
327            }
328            PlanNode::Sequence(sequence_node) => {
329                self.execute_sequence_wave(ctx, sequence_node).await?;
330            }
331            PlanNode::Condition(condition_node) => {
332                if let Some(node) =
333                    condition_node_by_variables(condition_node, self.variable_values)
334                {
335                    Box::pin(self.execute_plan_node(ctx, node)).await?;
336                }
337            }
338            // An unsupported plan node was found, do nothing.
339            _ => {}
340        }
341
342        Ok(())
343    }
344
345    fn prepare_job_future(
346        &'exec self,
347        node: &'exec PlanNode,
348        final_response: &Value<'exec>,
349    ) -> BoxFuture<'exec, Result<ExecutionJob<'exec>, PlanExecutionError>> {
350        match node {
351            PlanNode::Fetch(fetch_node) => Box::pin(self.execute_fetch_node(fetch_node, None)),
352            PlanNode::Flatten(flatten_node) => {
353                match self.prepare_flatten_data(final_response, flatten_node) {
354                    Ok(Some(p)) => Box::pin(self.execute_flatten_fetch_node(
355                        flatten_node,
356                        Some(p.representations),
357                        Some(p.representation_hashes),
358                        Some(p.representation_hash_to_index),
359                    )),
360                    Ok(None) => Box::pin(async { Ok(ExecutionJob::None) }),
361                    Err(e) => Box::pin(async move { Err(e) }),
362                }
363            }
364            PlanNode::Condition(node) => {
365                match condition_node_by_variables(node, self.variable_values) {
366                    Some(node) => Box::pin(self.prepare_job_future(node, final_response)), // This is already clean.
367                    None => Box::pin(async { Ok(ExecutionJob::None) }),
368                }
369            }
370            // Our Query Planner does not produce any other plan node types in ParallelNode
371            _ => Box::pin(async { Ok(ExecutionJob::None) }),
372        }
373    }
374
375    fn process_subgraph_response(
376        &self,
377        ctx: &mut ExecutionContext<'exec>,
378        response_bytes: Option<Bytes>,
379        fetch_node_id: i64,
380    ) -> Option<&'exec [FetchRewrite]> {
381        if let Some(response_bytes) = response_bytes {
382            ctx.response_storage.add_response(response_bytes);
383        }
384
385        ctx.output_rewrites.get(fetch_node_id)
386    }
387
388    fn process_job_result(
389        &self,
390        ctx: &mut ExecutionContext<'exec>,
391        job: ExecutionJob<'exec>,
392    ) -> Result<(), PlanExecutionError> {
393        match job {
394            ExecutionJob::Fetch(mut job) => {
395                if let Some(response_headers) = &job.response.headers {
396                    apply_subgraph_response_headers(
397                        self.headers_plan,
398                        job.subgraph_name,
399                        response_headers,
400                        self.client_request,
401                        &mut ctx.response_headers_aggregator,
402                    )
403                    .with_plan_context(LazyPlanContext {
404                        subgraph_name: || Some(job.subgraph_name.into()),
405                        affected_path: || None,
406                    })?;
407                }
408
409                if let Some(output_rewrites) =
410                    self.process_subgraph_response(ctx, job.response.bytes, job.fetch_node_id)
411                {
412                    for output_rewrite in output_rewrites {
413                        output_rewrite
414                            .rewrite(&self.schema_metadata.possible_types, &mut job.response.data);
415                    }
416                }
417
418                ctx.handle_errors(job.subgraph_name, None, job.response.errors, None);
419
420                deep_merge(&mut ctx.final_response, job.response.data);
421            }
422            ExecutionJob::FlattenFetch(mut job) => {
423                if let Some(response_headers) = &job.response.headers {
424                    apply_subgraph_response_headers(
425                        self.headers_plan,
426                        job.subgraph_name,
427                        response_headers,
428                        self.client_request,
429                        &mut ctx.response_headers_aggregator,
430                    )
431                    .with_plan_context(LazyPlanContext {
432                        subgraph_name: || Some(job.subgraph_name.into()),
433                        affected_path: || None,
434                    })?;
435                }
436
437                let output_rewrites =
438                    self.process_subgraph_response(ctx, job.response.bytes, job.fetch_node_id);
439
440                let mut entity_index_error_map: Option<HashMap<&usize, Vec<GraphQLErrorPath>>> =
441                    None;
442
443                if let Some(mut entities) = job.response.data.take_entities() {
444                    if let Some(output_rewrites) = output_rewrites {
445                        for output_rewrite in output_rewrites {
446                            for entity in &mut entities {
447                                output_rewrite
448                                    .rewrite(&self.schema_metadata.possible_types, entity);
449                            }
450                        }
451                    }
452
453                    let mut index = 0;
454                    let normalized_path = job.flatten_node_path.as_slice();
455                    // If there is an error in the response, then collect the paths for normalizing the error
456                    let initial_error_path = job
457                        .response
458                        .errors
459                        .as_ref()
460                        .map(|_| GraphQLErrorPath::with_capacity(normalized_path.len() + 2));
461                    entity_index_error_map = job
462                        .response
463                        .errors
464                        .as_ref()
465                        .map(|_| HashMap::with_capacity(entities.len()));
466                    traverse_and_callback_mut(
467                        &mut ctx.final_response,
468                        normalized_path,
469                        self.schema_metadata,
470                        initial_error_path,
471                        &mut |target, error_path| {
472                            let hash = job.representation_hashes[index];
473                            if let Some(entity_index) = job.representation_hash_to_index.get(&hash)
474                            {
475                                if let (Some(error_path), Some(entity_index_error_map)) =
476                                    (error_path, entity_index_error_map.as_mut())
477                                {
478                                    let error_paths = entity_index_error_map
479                                        .entry(entity_index)
480                                        .or_insert_with(Vec::new);
481                                    error_paths.push(error_path);
482                                }
483                                if let Some(entity) = entities.get(*entity_index) {
484                                    deep_merge(target, entity.clone());
485                                }
486                            }
487                            index += 1;
488                        },
489                    );
490                }
491                ctx.handle_errors(
492                    job.subgraph_name,
493                    Some(job.flatten_node_path),
494                    job.response.errors,
495                    entity_index_error_map,
496                );
497            }
498            ExecutionJob::None => {
499                // nothing to do
500            }
501        }
502        Ok(())
503    }
504
505    fn prepare_flatten_data(
506        &self,
507        final_response: &Value<'exec>,
508        flatten_node: &'exec FlattenNode,
509    ) -> Result<Option<PreparedFlattenData>, PlanExecutionError> {
510        let fetch_node = match flatten_node.node.as_ref() {
511            PlanNode::Fetch(fetch_node) => fetch_node,
512            _ => return Ok(None),
513        };
514        let requires_nodes = match fetch_node.requires.as_ref() {
515            Some(nodes) => nodes,
516            None => return Ok(None),
517        };
518
519        let mut index = 0;
520        let normalized_path = flatten_node.path.as_slice();
521        let mut filtered_representations = Vec::new();
522        filtered_representations.put(OPEN_BRACKET);
523        let proj_ctx = RequestProjectionContext::new(&self.schema_metadata.possible_types);
524        let mut representation_hashes: Vec<u64> = Vec::new();
525        let mut filtered_representations_hashes: HashMap<u64, usize> = HashMap::new();
526        let arena = bumpalo::Bump::new();
527
528        traverse_and_callback(
529            final_response,
530            normalized_path,
531            self.schema_metadata,
532            &mut |entity| {
533                let hash = entity.to_hash(&requires_nodes.items, proj_ctx.possible_types);
534
535                if !entity.is_null() {
536                    representation_hashes.push(hash);
537                }
538
539                if filtered_representations_hashes.contains_key(&hash) {
540                    return Ok::<(), PlanExecutionError>(());
541                }
542
543                let entity = if let Some(input_rewrites) = &fetch_node.input_rewrites {
544                    let new_entity = arena.alloc(entity.clone());
545                    for input_rewrite in input_rewrites {
546                        input_rewrite.rewrite(&self.schema_metadata.possible_types, new_entity);
547                    }
548                    new_entity
549                } else {
550                    entity
551                };
552
553                let is_projected = project_requires(
554                    &proj_ctx,
555                    &requires_nodes.items,
556                    entity,
557                    &mut filtered_representations,
558                    filtered_representations_hashes.is_empty(),
559                    None,
560                )
561                .with_plan_context(LazyPlanContext {
562                    subgraph_name: || Some(fetch_node.service_name.clone()),
563                    affected_path: || Some(flatten_node.path.to_string()),
564                })?;
565
566                if is_projected {
567                    filtered_representations_hashes.insert(hash, index);
568                }
569
570                index += 1;
571
572                Ok(())
573            },
574        )?;
575        filtered_representations.put(CLOSE_BRACKET);
576
577        if filtered_representations_hashes.is_empty() {
578            return Ok(None);
579        }
580
581        Ok(Some(PreparedFlattenData {
582            representations: filtered_representations,
583            representation_hashes,
584            representation_hash_to_index: filtered_representations_hashes,
585        }))
586    }
587
588    async fn execute_flatten_fetch_node(
589        &'exec self,
590        node: &'exec FlattenNode,
591        representations: Option<Vec<u8>>,
592        representation_hashes: Option<Vec<u64>>,
593        filtered_representations_hashes: Option<HashMap<u64, usize>>,
594    ) -> Result<ExecutionJob<'exec>, PlanExecutionError> {
595        let fetch_node = match node.node.as_ref() {
596            PlanNode::Fetch(fetch_node) => fetch_node,
597            _ => return Ok(ExecutionJob::None),
598        };
599
600        match self.execute_fetch_node(fetch_node, representations).await? {
601            ExecutionJob::Fetch(job) => Ok(ExecutionJob::FlattenFetch(FlattenFetchJob {
602                flatten_node_path: &node.path,
603                response: job.response,
604                fetch_node_id: job.fetch_node_id,
605                subgraph_name: job.subgraph_name,
606                representation_hashes: representation_hashes.unwrap_or_default(),
607                representation_hash_to_index: filtered_representations_hashes.unwrap_or_default(),
608            })),
609            _ => Ok(ExecutionJob::None),
610        }
611    }
612
613    async fn execute_fetch_node(
614        &'exec self,
615        node: &'exec FetchNode,
616        representations: Option<Vec<u8>>,
617    ) -> Result<ExecutionJob<'exec>, PlanExecutionError> {
618        // TODO: We could optimize header map creation by caching them per service name
619        let mut headers_map = HeaderMap::new();
620        modify_subgraph_request_headers(
621            self.headers_plan,
622            &node.service_name,
623            self.client_request,
624            &mut headers_map,
625        )
626        .with_plan_context(LazyPlanContext {
627            subgraph_name: || Some(node.service_name.clone()),
628            affected_path: || None,
629        })?;
630        let variable_refs =
631            select_fetch_variables(self.variable_values, node.variable_usages.as_ref());
632
633        let mut subgraph_request = SubgraphExecutionRequest {
634            query: node.operation.document_str.as_str(),
635            dedupe: self.dedupe_subgraph_requests,
636            operation_name: node.operation_name.as_deref(),
637            variables: variable_refs,
638            representations,
639            headers: headers_map,
640            extensions: None,
641        };
642
643        if let Some(jwt_forwarding_plan) = &self.jwt_forwarding_plan {
644            subgraph_request.add_request_extensions_field(
645                jwt_forwarding_plan.extension_field_name.clone(),
646                jwt_forwarding_plan.extension_field_value.clone(),
647            );
648        }
649
650        Ok(ExecutionJob::Fetch(FetchJob {
651            fetch_node_id: node.id,
652            subgraph_name: &node.service_name,
653            response: self
654                .executors
655                .execute(&node.service_name, subgraph_request, self.client_request)
656                .await,
657        }))
658    }
659
660    fn log_error(&self, error: &PlanExecutionError) {
661        tracing::error!(
662            subgraph_name = error.subgraph_name(),
663            error = error as &dyn std::error::Error,
664            "Plan execution error"
665        );
666    }
667}
668
669fn condition_node_by_variables<'a>(
670    condition_node: &'a ConditionNode,
671    variable_values: &'a Option<HashMap<String, sonic_rs::Value>>,
672) -> Option<&'a PlanNode> {
673    let vars = variable_values.as_ref()?;
674    let value = vars.get(&condition_node.condition)?;
675    let condition_met = matches!(value.as_ref(), ValueRef::Bool(true));
676
677    if condition_met {
678        condition_node.if_clause.as_deref()
679    } else {
680        condition_node.else_clause.as_deref()
681    }
682}
683
684fn select_fetch_variables<'a>(
685    variable_values: &'a Option<HashMap<String, sonic_rs::Value>>,
686    variable_usages: Option<&BTreeSet<String>>,
687) -> Option<HashMap<&'a str, &'a sonic_rs::Value>> {
688    let values = variable_values.as_ref()?;
689
690    variable_usages.map(|variable_usages| {
691        variable_usages
692            .iter()
693            .filter_map(|var_name| {
694                values
695                    .get_key_value(var_name.as_str())
696                    .map(|(key, value)| (key.as_str(), value))
697            })
698            .collect()
699    })
700}
701
702#[cfg(test)]
703mod tests {
704    use crate::{
705        context::ExecutionContext,
706        response::graphql_error::{GraphQLErrorExtensions, GraphQLErrorPath},
707    };
708
709    use super::select_fetch_variables;
710    use sonic_rs::Value;
711    use std::collections::{BTreeSet, HashMap};
712
713    fn value_from_number(n: i32) -> Value {
714        sonic_rs::from_str(&n.to_string()).unwrap()
715    }
716
717    #[test]
718    fn select_fetch_variables_only_used_variables() {
719        let mut variable_values_map = HashMap::new();
720        variable_values_map.insert("used".to_string(), value_from_number(1));
721        variable_values_map.insert("unused".to_string(), value_from_number(2));
722        let variable_values = Some(variable_values_map);
723
724        let mut usages = BTreeSet::new();
725        usages.insert("used".to_string());
726
727        let selected = select_fetch_variables(&variable_values, Some(&usages)).unwrap();
728
729        assert_eq!(selected.len(), 1);
730        assert!(selected.contains_key("used"));
731        assert!(!selected.contains_key("unused"));
732    }
733
734    #[test]
735    fn select_fetch_variables_ignores_missing_usage_entries() {
736        let mut variable_values_map = HashMap::new();
737        variable_values_map.insert("present".to_string(), value_from_number(3));
738        let variable_values = Some(variable_values_map);
739
740        let mut usages = BTreeSet::new();
741        usages.insert("present".to_string());
742        usages.insert("missing".to_string());
743
744        let selected = select_fetch_variables(&variable_values, Some(&usages)).unwrap();
745
746        assert_eq!(selected.len(), 1);
747        assert!(selected.contains_key("present"));
748        assert!(!selected.contains_key("missing"));
749    }
750
751    #[test]
752    fn select_fetch_variables_for_no_usage_entries() {
753        let mut variable_values_map = HashMap::new();
754        variable_values_map.insert("unused_1".to_string(), value_from_number(1));
755        variable_values_map.insert("unused_2".to_string(), value_from_number(2));
756
757        let variable_values = Some(variable_values_map);
758
759        let selected = select_fetch_variables(&variable_values, None);
760
761        assert!(selected.is_none());
762    }
763    #[test]
764    /**
765     * We have the same entity in two different paths ["a", 0] and ["b", 1],
766     * and the subgraph response has an error for this entity.
767     * So we should duplicate the error for both paths.
768     */
769    fn normalize_entity_errors_correctly() {
770        use crate::response::graphql_error::{GraphQLError, GraphQLErrorPathSegment};
771        use std::collections::HashMap;
772        let mut ctx = ExecutionContext::default();
773        let mut entity_index_error_map: HashMap<&usize, Vec<GraphQLErrorPath>> = HashMap::new();
774        entity_index_error_map.insert(
775            &0,
776            vec![
777                GraphQLErrorPath {
778                    segments: vec![
779                        GraphQLErrorPathSegment::String("a".to_string()),
780                        GraphQLErrorPathSegment::Index(0),
781                    ],
782                },
783                GraphQLErrorPath {
784                    segments: vec![
785                        GraphQLErrorPathSegment::String("b".to_string()),
786                        GraphQLErrorPathSegment::Index(1),
787                    ],
788                },
789            ],
790        );
791        let response_errors = vec![GraphQLError {
792            message: "Error 1".to_string(),
793            locations: None,
794            path: Some(GraphQLErrorPath {
795                segments: vec![
796                    GraphQLErrorPathSegment::String("_entities".to_string()),
797                    GraphQLErrorPathSegment::Index(0),
798                    GraphQLErrorPathSegment::String("field1".to_string()),
799                ],
800            }),
801            extensions: GraphQLErrorExtensions::default(),
802        }];
803        ctx.handle_errors(
804            "subgraph_a",
805            None,
806            Some(response_errors),
807            Some(entity_index_error_map),
808        );
809        assert_eq!(ctx.errors.len(), 2);
810        assert_eq!(ctx.errors[0].message, "Error 1");
811        assert_eq!(
812            ctx.errors[0].path.as_ref().unwrap().segments,
813            vec![
814                GraphQLErrorPathSegment::String("a".to_string()),
815                GraphQLErrorPathSegment::Index(0),
816                GraphQLErrorPathSegment::String("field1".to_string())
817            ]
818        );
819        assert_eq!(ctx.errors[1].message, "Error 1");
820        assert_eq!(
821            ctx.errors[1].path.as_ref().unwrap().segments,
822            vec![
823                GraphQLErrorPathSegment::String("b".to_string()),
824                GraphQLErrorPathSegment::Index(1),
825                GraphQLErrorPathSegment::String("field1".to_string())
826            ]
827        );
828    }
829}