hive_router_plan_executor/execution/
plan.rs

1use std::collections::{BTreeSet, HashMap};
2
3use bytes::{BufMut, Bytes};
4use futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt};
5use hive_router_query_planner::planner::plan_nodes::{
6    ConditionNode, FetchNode, FetchRewrite, FlattenNode, FlattenNodePath, ParallelNode, PlanNode,
7    QueryPlan, SequenceNode,
8};
9use http::HeaderMap;
10use serde::Deserialize;
11use sonic_rs::ValueRef;
12
13use crate::{
14    context::ExecutionContext,
15    execution::{
16        client_request_details::ClientRequestDetails,
17        error::{IntoPlanExecutionError, LazyPlanContext, PlanExecutionError},
18        jwt_forward::JwtAuthForwardingPlan,
19        rewrites::FetchRewriteExt,
20    },
21    executors::{
22        common::{HttpExecutionResponse, SubgraphExecutionRequest},
23        map::SubgraphExecutorMap,
24    },
25    headers::{
26        plan::HeaderRulesPlan,
27        request::modify_subgraph_request_headers,
28        response::{apply_subgraph_response_headers, modify_client_response_headers},
29    },
30    introspection::{
31        resolve::{resolve_introspection, IntrospectionContext},
32        schema::SchemaMetadata,
33    },
34    projection::{
35        plan::FieldProjectionPlan,
36        request::{project_requires, RequestProjectionContext},
37        response::project_by_operation,
38    },
39    response::{
40        graphql_error::{GraphQLError, GraphQLErrorExtensions, GraphQLErrorPath},
41        merge::deep_merge,
42        subgraph_response::SubgraphResponse,
43        value::Value,
44    },
45    utils::{
46        consts::{CLOSE_BRACKET, OPEN_BRACKET},
47        traverse::{traverse_and_callback, traverse_and_callback_mut},
48    },
49};
50
51pub struct QueryPlanExecutionContext<'exec, 'req> {
52    pub query_plan: &'exec QueryPlan,
53    pub projection_plan: &'exec Vec<FieldProjectionPlan>,
54    pub headers_plan: &'exec HeaderRulesPlan,
55    pub variable_values: &'exec Option<HashMap<String, sonic_rs::Value>>,
56    pub extensions: Option<HashMap<String, sonic_rs::Value>>,
57    pub client_request: &'exec ClientRequestDetails<'exec, 'req>,
58    pub introspection_context: &'exec IntrospectionContext<'exec, 'static>,
59    pub operation_type_name: &'exec str,
60    pub executors: &'exec SubgraphExecutorMap,
61    pub jwt_auth_forwarding: &'exec Option<JwtAuthForwardingPlan>,
62    pub initial_errors: Vec<GraphQLError>,
63}
64
65pub struct PlanExecutionOutput {
66    pub body: Vec<u8>,
67    pub headers: HeaderMap,
68    pub error_count: usize,
69}
70
71pub async fn execute_query_plan<'exec, 'req>(
72    ctx: QueryPlanExecutionContext<'exec, 'req>,
73) -> Result<PlanExecutionOutput, PlanExecutionError> {
74    let init_value = if let Some(introspection_query) = ctx.introspection_context.query {
75        resolve_introspection(introspection_query, ctx.introspection_context)
76    } else if ctx.projection_plan.is_empty() {
77        Value::Null
78    } else {
79        Value::Object(Vec::new())
80    };
81
82    let mut exec_ctx = ExecutionContext::new(ctx.query_plan, init_value, ctx.initial_errors);
83    let executor = Executor::new(
84        ctx.variable_values,
85        ctx.executors,
86        ctx.introspection_context.metadata,
87        ctx.client_request,
88        ctx.headers_plan,
89        ctx.jwt_auth_forwarding,
90        // Deduplicate subgraph requests only if the operation type is a query
91        ctx.operation_type_name == "Query",
92    );
93
94    if ctx.query_plan.node.is_some() {
95        executor
96            .execute(&mut exec_ctx, ctx.query_plan.node.as_ref())
97            .await?;
98    }
99
100    let mut response_headers = HeaderMap::new();
101    modify_client_response_headers(exec_ctx.response_headers_aggregator, &mut response_headers)
102        .with_plan_context(LazyPlanContext {
103            subgraph_name: || None,
104            affected_path: || None,
105        })?;
106
107    let final_response = &exec_ctx.final_response;
108    let error_count = exec_ctx.errors.len(); // Added for usage reporting
109    let body = project_by_operation(
110        final_response,
111        exec_ctx.errors,
112        &ctx.extensions,
113        ctx.operation_type_name,
114        ctx.projection_plan,
115        ctx.variable_values,
116        exec_ctx.response_storage.estimate_final_response_size(),
117    )
118    .with_plan_context(LazyPlanContext {
119        subgraph_name: || None,
120        affected_path: || None,
121    })?;
122
123    Ok(PlanExecutionOutput {
124        body,
125        headers: response_headers,
126        error_count,
127    })
128}
129
130pub struct Executor<'exec, 'req> {
131    variable_values: &'exec Option<HashMap<String, sonic_rs::Value>>,
132    schema_metadata: &'exec SchemaMetadata,
133    executors: &'exec SubgraphExecutorMap,
134    client_request: &'exec ClientRequestDetails<'exec, 'req>,
135    headers_plan: &'exec HeaderRulesPlan,
136    jwt_forwarding_plan: &'exec Option<JwtAuthForwardingPlan>,
137    dedupe_subgraph_requests: bool,
138}
139
140struct ConcurrencyScope<'exec, T> {
141    jobs: FuturesUnordered<BoxFuture<'exec, T>>,
142}
143
144impl<'exec, T> ConcurrencyScope<'exec, T> {
145    fn new() -> Self {
146        Self {
147            jobs: FuturesUnordered::new(),
148        }
149    }
150
151    fn spawn(&mut self, future: BoxFuture<'exec, T>) {
152        self.jobs.push(future);
153    }
154
155    async fn join_all(mut self) -> Vec<T> {
156        let mut results = Vec::with_capacity(self.jobs.len());
157        while let Some(result) = self.jobs.next().await {
158            results.push(result);
159        }
160        results
161    }
162}
163
164struct SubgraphOutput {
165    body: Bytes,
166    headers: HeaderMap,
167}
168
169struct FetchJob {
170    fetch_node_id: i64,
171    subgraph_name: String,
172    response: SubgraphOutput,
173}
174
175struct FlattenFetchJob {
176    flatten_node_path: FlattenNodePath,
177    response: SubgraphOutput,
178    fetch_node_id: i64,
179    subgraph_name: String,
180    representation_hashes: Vec<u64>,
181    representation_hash_to_index: HashMap<u64, usize>,
182}
183
184enum ExecutionJob {
185    Fetch(FetchJob),
186    FlattenFetch(FlattenFetchJob),
187    None,
188}
189
190impl From<ExecutionJob> for SubgraphOutput {
191    fn from(value: ExecutionJob) -> Self {
192        match value {
193            ExecutionJob::Fetch(j) => Self {
194                body: j.response.body,
195                headers: j.response.headers,
196            },
197            ExecutionJob::FlattenFetch(j) => Self {
198                body: j.response.body,
199                headers: j.response.headers,
200            },
201            ExecutionJob::None => Self {
202                body: Bytes::new(),
203                headers: HeaderMap::new(),
204            },
205        }
206    }
207}
208
209impl From<HttpExecutionResponse> for SubgraphOutput {
210    fn from(res: HttpExecutionResponse) -> Self {
211        Self {
212            body: res.body,
213            headers: res.headers,
214        }
215    }
216}
217
218struct PreparedFlattenData {
219    representations: Vec<u8>,
220    representation_hashes: Vec<u64>,
221    representation_hash_to_index: HashMap<u64, usize>,
222}
223
224impl<'exec, 'req> Executor<'exec, 'req> {
225    pub fn new(
226        variable_values: &'exec Option<HashMap<String, sonic_rs::Value>>,
227        executors: &'exec SubgraphExecutorMap,
228        schema_metadata: &'exec SchemaMetadata,
229        client_request: &'exec ClientRequestDetails<'exec, 'req>,
230        headers_plan: &'exec HeaderRulesPlan,
231        jwt_forwarding_plan: &'exec Option<JwtAuthForwardingPlan>,
232        dedupe_subgraph_requests: bool,
233    ) -> Self {
234        Executor {
235            variable_values,
236            executors,
237            schema_metadata,
238            client_request,
239            headers_plan,
240            dedupe_subgraph_requests,
241            jwt_forwarding_plan,
242        }
243    }
244
245    pub async fn execute(
246        &self,
247        ctx: &mut ExecutionContext<'exec>,
248        plan: Option<&PlanNode>,
249    ) -> Result<(), PlanExecutionError> {
250        match plan {
251            Some(PlanNode::Fetch(node)) => self.execute_fetch_wave(ctx, node).await,
252            Some(PlanNode::Parallel(node)) => self.execute_parallel_wave(ctx, node).await,
253            Some(PlanNode::Sequence(node)) => self.execute_sequence_wave(ctx, node).await,
254            // Plans produced by our Query Planner can only start with: Fetch, Sequence or Parallel.
255            // Any other node type at the root is not supported, do nothing
256            Some(_) => Ok(()),
257            // An empty plan is valid, just do nothing
258            None => Ok(()),
259        }
260    }
261
262    async fn execute_fetch_wave(
263        &self,
264        ctx: &mut ExecutionContext<'exec>,
265        node: &FetchNode,
266    ) -> Result<(), PlanExecutionError> {
267        match self.execute_fetch_node(node, None).await {
268            Ok(result) => self.process_job_result(ctx, result),
269            Err(err) => {
270                self.log_error(&err);
271                ctx.errors.push(err.into());
272                Ok(())
273            }
274        }
275    }
276
277    async fn execute_sequence_wave(
278        &self,
279        ctx: &mut ExecutionContext<'exec>,
280        node: &SequenceNode,
281    ) -> Result<(), PlanExecutionError> {
282        for child in &node.nodes {
283            Box::pin(self.execute_plan_node(ctx, child)).await?;
284        }
285
286        Ok(())
287    }
288
289    async fn execute_parallel_wave(
290        &self,
291        ctx: &mut ExecutionContext<'exec>,
292        node: &ParallelNode,
293    ) -> Result<(), PlanExecutionError> {
294        let mut scope = ConcurrencyScope::new();
295
296        for child in &node.nodes {
297            let job_future = self.prepare_job_future(child, &ctx.final_response);
298            scope.spawn(job_future);
299        }
300
301        let results = scope.join_all().await;
302
303        for result in results {
304            match result {
305                Ok(job) => {
306                    self.process_job_result(ctx, job)?;
307                }
308                Err(err) => {
309                    self.log_error(&err);
310                    ctx.errors.push(err.into())
311                }
312            }
313        }
314
315        Ok(())
316    }
317
318    async fn execute_plan_node(
319        &self,
320        ctx: &mut ExecutionContext<'exec>,
321        node: &PlanNode,
322    ) -> Result<(), PlanExecutionError> {
323        match node {
324            PlanNode::Fetch(fetch_node) => match self.execute_fetch_node(fetch_node, None).await {
325                Ok(job) => {
326                    self.process_job_result(ctx, job)?;
327                }
328                Err(err) => {
329                    self.log_error(&err);
330                    ctx.errors.push(err.into());
331                }
332            },
333            PlanNode::Parallel(parallel_node) => {
334                self.execute_parallel_wave(ctx, parallel_node).await?;
335            }
336            PlanNode::Flatten(flatten_node) => {
337                match self.prepare_flatten_data(&ctx.final_response, flatten_node) {
338                    Ok(Some(p)) => {
339                        match self
340                            .execute_flatten_fetch_node(
341                                flatten_node,
342                                Some(p.representations),
343                                Some(p.representation_hashes),
344                                Some(p.representation_hash_to_index),
345                            )
346                            .await
347                        {
348                            Ok(job) => {
349                                self.process_job_result(ctx, job)?;
350                            }
351                            Err(err) => {
352                                self.log_error(&err);
353                                ctx.errors.push(err.into());
354                            }
355                        }
356                    }
357                    Ok(None) => { /* do nothing */ }
358                    Err(err) => {
359                        self.log_error(&err);
360                        ctx.errors.push(err.into());
361                    }
362                }
363            }
364            PlanNode::Sequence(sequence_node) => {
365                self.execute_sequence_wave(ctx, sequence_node).await?;
366            }
367            PlanNode::Condition(condition_node) => {
368                if let Some(node) =
369                    condition_node_by_variables(condition_node, self.variable_values)
370                {
371                    Box::pin(self.execute_plan_node(ctx, node)).await?;
372                }
373            }
374            // An unsupported plan node was found, do nothing.
375            _ => {}
376        }
377
378        Ok(())
379    }
380
381    fn prepare_job_future<'wave>(
382        &'wave self,
383        node: &'wave PlanNode,
384        final_response: &Value<'exec>,
385    ) -> BoxFuture<'wave, Result<ExecutionJob, PlanExecutionError>> {
386        match node {
387            PlanNode::Fetch(fetch_node) => Box::pin(self.execute_fetch_node(fetch_node, None)),
388            PlanNode::Flatten(flatten_node) => {
389                match self.prepare_flatten_data(final_response, flatten_node) {
390                    Ok(Some(p)) => Box::pin(self.execute_flatten_fetch_node(
391                        flatten_node,
392                        Some(p.representations),
393                        Some(p.representation_hashes),
394                        Some(p.representation_hash_to_index),
395                    )),
396                    Ok(None) => Box::pin(async { Ok(ExecutionJob::None) }),
397                    Err(e) => Box::pin(async move { Err(e) }),
398                }
399            }
400            PlanNode::Condition(node) => {
401                match condition_node_by_variables(node, self.variable_values) {
402                    Some(node) => Box::pin(self.prepare_job_future(node, final_response)), // This is already clean.
403                    None => Box::pin(async { Ok(ExecutionJob::None) }),
404                }
405            }
406            // Our Query Planner does not produce any other plan node types in ParallelNode
407            _ => Box::pin(async { Ok(ExecutionJob::None) }),
408        }
409    }
410
411    fn process_subgraph_response(
412        &self,
413        subgraph_name: &str,
414        ctx: &mut ExecutionContext<'exec>,
415        response_bytes: Bytes,
416        fetch_node_id: i64,
417    ) -> Option<(SubgraphResponse<'exec>, Option<&'exec Vec<FetchRewrite>>)> {
418        let idx = ctx.response_storage.add_response(response_bytes);
419        // SAFETY: The `bytes` are transmuted to the lifetime `'a` of the `ExecutionContext`.
420        // This is safe because the `response_storage` is part of the `ExecutionContext` (`ctx`)
421        // and will live as long as `'a`. The `Bytes` are stored in an `Arc`, so they won't be
422        // dropped until all references are gone. The `Value`s deserialized from this byte
423        // slice will borrow from it, and they are stored in `ctx.final_response`, which also
424        // lives for `'a`.
425        let bytes: &'exec [u8] =
426            unsafe { std::mem::transmute(ctx.response_storage.get_bytes(idx)) };
427
428        // SAFETY: The `output_rewrites` are transmuted to the lifetime `'a`. This is safe
429        // because `output_rewrites` is part of `OutputRewritesStorage` which is owned by
430        // `ExecutionContext` and lives for `'a`.
431        let output_rewrites: Option<&'exec Vec<FetchRewrite>> =
432            unsafe { std::mem::transmute(ctx.output_rewrites.get(fetch_node_id)) };
433
434        let mut deserializer = sonic_rs::Deserializer::from_slice(bytes);
435        let response = match SubgraphResponse::deserialize(&mut deserializer) {
436            Ok(response) => response,
437            Err(e) => {
438                let message = format!("Failed to deserialize subgraph response: {}", e);
439                let extensions = GraphQLErrorExtensions::new_from_code_and_service_name(
440                    "SUBGRAPH_RESPONSE_DESERIALIZATION_FAILED",
441                    subgraph_name,
442                );
443                let error = GraphQLError::from_message_and_extensions(message, extensions);
444
445                ctx.errors.push(error);
446                return None;
447            }
448        };
449
450        Some((response, output_rewrites))
451    }
452
453    fn process_job_result(
454        &self,
455        ctx: &mut ExecutionContext<'exec>,
456        job: ExecutionJob,
457    ) -> Result<(), PlanExecutionError> {
458        let _: () = match job {
459            ExecutionJob::Fetch(job) => {
460                apply_subgraph_response_headers(
461                    self.headers_plan,
462                    &job.subgraph_name,
463                    &job.response.headers,
464                    self.client_request,
465                    &mut ctx.response_headers_aggregator,
466                )
467                .with_plan_context(LazyPlanContext {
468                    subgraph_name: || Some(job.subgraph_name.clone()),
469                    affected_path: || None,
470                })?;
471
472                if let Some((mut response, output_rewrites)) = self.process_subgraph_response(
473                    job.subgraph_name.as_ref(),
474                    ctx,
475                    job.response.body,
476                    job.fetch_node_id,
477                ) {
478                    ctx.handle_errors(&job.subgraph_name, None, response.errors, None);
479                    if let Some(output_rewrites) = output_rewrites {
480                        for output_rewrite in output_rewrites {
481                            output_rewrite
482                                .rewrite(&self.schema_metadata.possible_types, &mut response.data);
483                        }
484                    }
485
486                    deep_merge(&mut ctx.final_response, response.data);
487                }
488            }
489            ExecutionJob::FlattenFetch(job) => {
490                apply_subgraph_response_headers(
491                    self.headers_plan,
492                    &job.subgraph_name,
493                    &job.response.headers,
494                    self.client_request,
495                    &mut ctx.response_headers_aggregator,
496                )
497                .with_plan_context(LazyPlanContext {
498                    subgraph_name: || Some(job.subgraph_name.clone()),
499                    affected_path: || Some(job.flatten_node_path.to_string()),
500                })?;
501
502                if let Some((mut response, output_rewrites)) = self.process_subgraph_response(
503                    &job.subgraph_name,
504                    ctx,
505                    job.response.body,
506                    job.fetch_node_id,
507                ) {
508                    if let Some(mut entities) = response.data.take_entities() {
509                        if let Some(output_rewrites) = output_rewrites {
510                            for output_rewrite in output_rewrites {
511                                for entity in &mut entities {
512                                    output_rewrite
513                                        .rewrite(&self.schema_metadata.possible_types, entity);
514                                }
515                            }
516                        }
517
518                        let mut index = 0;
519                        let normalized_path = job.flatten_node_path.as_slice();
520                        // If there is an error in the response, then collect the paths for normalizing the error
521                        let initial_error_path = response
522                            .errors
523                            .as_ref()
524                            .map(|_| GraphQLErrorPath::with_capacity(normalized_path.len() + 2));
525                        let mut entity_index_error_map = response
526                            .errors
527                            .as_ref()
528                            .map(|_| HashMap::with_capacity(entities.len()));
529                        traverse_and_callback_mut(
530                            &mut ctx.final_response,
531                            normalized_path,
532                            self.schema_metadata,
533                            initial_error_path,
534                            &mut |target, error_path| {
535                                let hash = job.representation_hashes[index];
536                                if let Some(entity_index) =
537                                    job.representation_hash_to_index.get(&hash)
538                                {
539                                    if let (Some(error_path), Some(entity_index_error_map)) =
540                                        (error_path, entity_index_error_map.as_mut())
541                                    {
542                                        let error_paths = entity_index_error_map
543                                            .entry(entity_index)
544                                            .or_insert_with(Vec::new);
545                                        error_paths.push(error_path);
546                                    }
547                                    if let Some(entity) = entities.get(*entity_index) {
548                                        // SAFETY: `new_val` is a clone of an entity that lives for `'a`.
549                                        // The transmute is to satisfy the compiler, but the lifetime
550                                        // is valid.
551                                        let new_val: Value<'_> =
552                                            unsafe { std::mem::transmute(entity.clone()) };
553                                        deep_merge(target, new_val);
554                                    }
555                                }
556                                index += 1;
557                            },
558                        );
559                        ctx.handle_errors(
560                            &job.subgraph_name,
561                            Some(job.flatten_node_path.to_string()),
562                            response.errors,
563                            entity_index_error_map,
564                        );
565                    } else if let Some(errors) = response.errors {
566                        // No entities were returned, but there are errors to handle.
567                        // We associate them with the flattened path and subgraph.
568                        let affected_path = job.flatten_node_path.to_string();
569                        ctx.errors.extend(errors.into_iter().map(|e| {
570                            e.add_subgraph_name(&job.subgraph_name)
571                                .add_affected_path(affected_path.clone())
572                        }));
573                    }
574                }
575            }
576            ExecutionJob::None => {
577                // nothing to do
578            }
579        };
580        Ok(())
581    }
582
583    fn prepare_flatten_data(
584        &self,
585        final_response: &Value<'exec>,
586        flatten_node: &FlattenNode,
587    ) -> Result<Option<PreparedFlattenData>, PlanExecutionError> {
588        let fetch_node = match flatten_node.node.as_ref() {
589            PlanNode::Fetch(fetch_node) => fetch_node,
590            _ => return Ok(None),
591        };
592        let requires_nodes = match fetch_node.requires.as_ref() {
593            Some(nodes) => nodes,
594            None => return Ok(None),
595        };
596
597        let mut index = 0;
598        let normalized_path = flatten_node.path.as_slice();
599        let mut filtered_representations = Vec::new();
600        filtered_representations.put(OPEN_BRACKET);
601        let proj_ctx = RequestProjectionContext::new(&self.schema_metadata.possible_types);
602        let mut representation_hashes: Vec<u64> = Vec::new();
603        let mut filtered_representations_hashes: HashMap<u64, usize> = HashMap::new();
604        let arena = bumpalo::Bump::new();
605
606        traverse_and_callback(
607            final_response,
608            normalized_path,
609            self.schema_metadata,
610            &mut |entity| {
611                let hash = entity.to_hash(&requires_nodes.items, proj_ctx.possible_types);
612
613                if !entity.is_null() {
614                    representation_hashes.push(hash);
615                }
616
617                if filtered_representations_hashes.contains_key(&hash) {
618                    return Ok::<(), PlanExecutionError>(());
619                }
620
621                let entity = if let Some(input_rewrites) = &fetch_node.input_rewrites {
622                    let new_entity = arena.alloc(entity.clone());
623                    for input_rewrite in input_rewrites {
624                        input_rewrite.rewrite(&self.schema_metadata.possible_types, new_entity);
625                    }
626                    new_entity
627                } else {
628                    entity
629                };
630
631                let is_projected = project_requires(
632                    &proj_ctx,
633                    &requires_nodes.items,
634                    entity,
635                    &mut filtered_representations,
636                    filtered_representations_hashes.is_empty(),
637                    None,
638                )
639                .with_plan_context(LazyPlanContext {
640                    subgraph_name: || Some(fetch_node.service_name.clone()),
641                    affected_path: || Some(flatten_node.path.to_string()),
642                })?;
643
644                if is_projected {
645                    filtered_representations_hashes.insert(hash, index);
646                }
647
648                index += 1;
649
650                Ok(())
651            },
652        )?;
653        filtered_representations.put(CLOSE_BRACKET);
654
655        if filtered_representations_hashes.is_empty() {
656            return Ok(None);
657        }
658
659        Ok(Some(PreparedFlattenData {
660            representations: filtered_representations,
661            representation_hashes,
662            representation_hash_to_index: filtered_representations_hashes,
663        }))
664    }
665
666    async fn execute_flatten_fetch_node(
667        &self,
668        node: &FlattenNode,
669        representations: Option<Vec<u8>>,
670        representation_hashes: Option<Vec<u64>>,
671        filtered_representations_hashes: Option<HashMap<u64, usize>>,
672    ) -> Result<ExecutionJob, PlanExecutionError> {
673        Ok(match node.node.as_ref() {
674            PlanNode::Fetch(fetch_node) => ExecutionJob::FlattenFetch(FlattenFetchJob {
675                flatten_node_path: node.path.clone(),
676                response: self
677                    .execute_fetch_node(fetch_node, representations)
678                    .await?
679                    .into(),
680                fetch_node_id: fetch_node.id,
681                subgraph_name: fetch_node.service_name.clone(),
682                representation_hashes: representation_hashes.unwrap_or_default(),
683                representation_hash_to_index: filtered_representations_hashes.unwrap_or_default(),
684            }),
685            _ => ExecutionJob::None,
686        })
687    }
688
689    async fn execute_fetch_node(
690        &self,
691        node: &FetchNode,
692        representations: Option<Vec<u8>>,
693    ) -> Result<ExecutionJob, PlanExecutionError> {
694        // TODO: We could optimize header map creation by caching them per service name
695        let mut headers_map = HeaderMap::new();
696        modify_subgraph_request_headers(
697            self.headers_plan,
698            &node.service_name,
699            self.client_request,
700            &mut headers_map,
701        )
702        .with_plan_context(LazyPlanContext {
703            subgraph_name: || Some(node.service_name.clone()),
704            affected_path: || None,
705        })?;
706        let variable_refs =
707            select_fetch_variables(self.variable_values, node.variable_usages.as_ref());
708
709        let mut subgraph_request = SubgraphExecutionRequest {
710            query: node.operation.document_str.as_str(),
711            dedupe: self.dedupe_subgraph_requests,
712            operation_name: node.operation_name.as_deref(),
713            variables: variable_refs,
714            representations,
715            headers: headers_map,
716            extensions: None,
717        };
718
719        if let Some(jwt_forwarding_plan) = &self.jwt_forwarding_plan {
720            subgraph_request.add_request_extensions_field(
721                jwt_forwarding_plan.extension_field_name.clone(),
722                jwt_forwarding_plan.extension_field_value.clone(),
723            );
724        }
725
726        Ok(ExecutionJob::Fetch(FetchJob {
727            fetch_node_id: node.id,
728            subgraph_name: node.service_name.clone(),
729            response: self
730                .executors
731                .execute(&node.service_name, subgraph_request, self.client_request)
732                .await
733                .into(),
734        }))
735    }
736
737    fn log_error(&self, error: &PlanExecutionError) {
738        tracing::error!(
739            subgraph_name = error.subgraph_name(),
740            error = error as &dyn std::error::Error,
741            "Plan execution error"
742        );
743    }
744}
745
746fn condition_node_by_variables<'a>(
747    condition_node: &'a ConditionNode,
748    variable_values: &'a Option<HashMap<String, sonic_rs::Value>>,
749) -> Option<&'a PlanNode> {
750    let vars = variable_values.as_ref()?;
751    let value = vars.get(&condition_node.condition)?;
752    let condition_met = matches!(value.as_ref(), ValueRef::Bool(true));
753
754    if condition_met {
755        condition_node.if_clause.as_deref()
756    } else {
757        condition_node.else_clause.as_deref()
758    }
759}
760
761fn select_fetch_variables<'a>(
762    variable_values: &'a Option<HashMap<String, sonic_rs::Value>>,
763    variable_usages: Option<&BTreeSet<String>>,
764) -> Option<HashMap<&'a str, &'a sonic_rs::Value>> {
765    let values = variable_values.as_ref()?;
766
767    variable_usages.map(|variable_usages| {
768        variable_usages
769            .iter()
770            .filter_map(|var_name| {
771                values
772                    .get_key_value(var_name.as_str())
773                    .map(|(key, value)| (key.as_str(), value))
774            })
775            .collect()
776    })
777}
778
779#[cfg(test)]
780mod tests {
781    use crate::{
782        context::ExecutionContext,
783        response::graphql_error::{GraphQLErrorExtensions, GraphQLErrorPath},
784    };
785
786    use super::select_fetch_variables;
787    use sonic_rs::Value;
788    use std::collections::{BTreeSet, HashMap};
789
790    fn value_from_number(n: i32) -> Value {
791        sonic_rs::from_str(&n.to_string()).unwrap()
792    }
793
794    #[test]
795    fn select_fetch_variables_only_used_variables() {
796        let mut variable_values_map = HashMap::new();
797        variable_values_map.insert("used".to_string(), value_from_number(1));
798        variable_values_map.insert("unused".to_string(), value_from_number(2));
799        let variable_values = Some(variable_values_map);
800
801        let mut usages = BTreeSet::new();
802        usages.insert("used".to_string());
803
804        let selected = select_fetch_variables(&variable_values, Some(&usages)).unwrap();
805
806        assert_eq!(selected.len(), 1);
807        assert!(selected.contains_key("used"));
808        assert!(!selected.contains_key("unused"));
809    }
810
811    #[test]
812    fn select_fetch_variables_ignores_missing_usage_entries() {
813        let mut variable_values_map = HashMap::new();
814        variable_values_map.insert("present".to_string(), value_from_number(3));
815        let variable_values = Some(variable_values_map);
816
817        let mut usages = BTreeSet::new();
818        usages.insert("present".to_string());
819        usages.insert("missing".to_string());
820
821        let selected = select_fetch_variables(&variable_values, Some(&usages)).unwrap();
822
823        assert_eq!(selected.len(), 1);
824        assert!(selected.contains_key("present"));
825        assert!(!selected.contains_key("missing"));
826    }
827
828    #[test]
829    fn select_fetch_variables_for_no_usage_entries() {
830        let mut variable_values_map = HashMap::new();
831        variable_values_map.insert("unused_1".to_string(), value_from_number(1));
832        variable_values_map.insert("unused_2".to_string(), value_from_number(2));
833
834        let variable_values = Some(variable_values_map);
835
836        let selected = select_fetch_variables(&variable_values, None);
837
838        assert!(selected.is_none());
839    }
840    #[test]
841    /**
842     * We have the same entity in two different paths ["a", 0] and ["b", 1],
843     * and the subgraph response has an error for this entity.
844     * So we should duplicate the error for both paths.
845     */
846    fn normalize_entity_errors_correctly() {
847        use crate::response::graphql_error::{GraphQLError, GraphQLErrorPathSegment};
848        use std::collections::HashMap;
849        let mut ctx = ExecutionContext::default();
850        let mut entity_index_error_map: HashMap<&usize, Vec<GraphQLErrorPath>> = HashMap::new();
851        entity_index_error_map.insert(
852            &0,
853            vec![
854                GraphQLErrorPath {
855                    segments: vec![
856                        GraphQLErrorPathSegment::String("a".to_string()),
857                        GraphQLErrorPathSegment::Index(0),
858                    ],
859                },
860                GraphQLErrorPath {
861                    segments: vec![
862                        GraphQLErrorPathSegment::String("b".to_string()),
863                        GraphQLErrorPathSegment::Index(1),
864                    ],
865                },
866            ],
867        );
868        let response_errors = vec![GraphQLError {
869            message: "Error 1".to_string(),
870            locations: None,
871            path: Some(GraphQLErrorPath {
872                segments: vec![
873                    GraphQLErrorPathSegment::String("_entities".to_string()),
874                    GraphQLErrorPathSegment::Index(0),
875                    GraphQLErrorPathSegment::String("field1".to_string()),
876                ],
877            }),
878            extensions: GraphQLErrorExtensions::default(),
879        }];
880        ctx.handle_errors(
881            "subgraph_a",
882            None,
883            Some(response_errors),
884            Some(entity_index_error_map),
885        );
886        assert_eq!(ctx.errors.len(), 2);
887        assert_eq!(ctx.errors[0].message, "Error 1");
888        assert_eq!(
889            ctx.errors[0].path.as_ref().unwrap().segments,
890            vec![
891                GraphQLErrorPathSegment::String("a".to_string()),
892                GraphQLErrorPathSegment::Index(0),
893                GraphQLErrorPathSegment::String("field1".to_string())
894            ]
895        );
896        assert_eq!(ctx.errors[1].message, "Error 1");
897        assert_eq!(
898            ctx.errors[1].path.as_ref().unwrap().segments,
899            vec![
900                GraphQLErrorPathSegment::String("b".to_string()),
901                GraphQLErrorPathSegment::Index(1),
902                GraphQLErrorPathSegment::String("field1".to_string())
903            ]
904        );
905    }
906}