hive_router_plan_executor/execution/
plan.rs

1use std::collections::{BTreeSet, HashMap};
2
3use bytes::{BufMut, Bytes};
4use futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt};
5use hive_router_query_planner::planner::plan_nodes::{
6    ConditionNode, FetchNode, FetchRewrite, FlattenNode, FlattenNodePath, ParallelNode, PlanNode,
7    QueryPlan, SequenceNode,
8};
9use http::HeaderMap;
10use serde::Deserialize;
11use sonic_rs::ValueRef;
12
13use crate::{
14    context::ExecutionContext,
15    execution::{
16        client_request_details::ClientRequestDetails,
17        error::{IntoPlanExecutionError, LazyPlanContext, PlanExecutionError},
18        jwt_forward::JwtAuthForwardingPlan,
19        rewrites::FetchRewriteExt,
20    },
21    executors::{
22        common::{HttpExecutionResponse, SubgraphExecutionRequest},
23        map::SubgraphExecutorMap,
24    },
25    headers::{
26        plan::HeaderRulesPlan,
27        request::modify_subgraph_request_headers,
28        response::{apply_subgraph_response_headers, modify_client_response_headers},
29    },
30    introspection::{
31        resolve::{resolve_introspection, IntrospectionContext},
32        schema::SchemaMetadata,
33    },
34    projection::{
35        plan::FieldProjectionPlan,
36        request::{project_requires, RequestProjectionContext},
37        response::project_by_operation,
38    },
39    response::{
40        graphql_error::{GraphQLError, GraphQLErrorExtensions, GraphQLErrorPath},
41        merge::deep_merge,
42        subgraph_response::SubgraphResponse,
43        value::Value,
44    },
45    utils::{
46        consts::{CLOSE_BRACKET, OPEN_BRACKET},
47        traverse::{traverse_and_callback, traverse_and_callback_mut},
48    },
49};
50
51pub struct QueryPlanExecutionContext<'exec, 'req> {
52    pub query_plan: &'exec QueryPlan,
53    pub projection_plan: &'exec Vec<FieldProjectionPlan>,
54    pub headers_plan: &'exec HeaderRulesPlan,
55    pub variable_values: &'exec Option<HashMap<String, sonic_rs::Value>>,
56    pub extensions: Option<HashMap<String, sonic_rs::Value>>,
57    pub client_request: &'exec ClientRequestDetails<'exec, 'req>,
58    pub introspection_context: &'exec IntrospectionContext<'exec, 'static>,
59    pub operation_type_name: &'exec str,
60    pub executors: &'exec SubgraphExecutorMap,
61    pub jwt_auth_forwarding: &'exec Option<JwtAuthForwardingPlan>,
62    pub initial_errors: Vec<GraphQLError>,
63}
64
65pub struct PlanExecutionOutput {
66    pub body: Vec<u8>,
67    pub headers: HeaderMap,
68    pub error_count: usize,
69}
70
71pub async fn execute_query_plan<'exec, 'req>(
72    ctx: QueryPlanExecutionContext<'exec, 'req>,
73) -> Result<PlanExecutionOutput, PlanExecutionError> {
74    let init_value = if let Some(introspection_query) = ctx.introspection_context.query {
75        resolve_introspection(introspection_query, ctx.introspection_context)
76    } else if ctx.projection_plan.is_empty() {
77        Value::Null
78    } else {
79        Value::Object(Vec::new())
80    };
81
82    let mut exec_ctx = ExecutionContext::new(ctx.query_plan, init_value, ctx.initial_errors);
83    let executor = Executor::new(
84        ctx.variable_values,
85        ctx.executors,
86        ctx.introspection_context.metadata,
87        ctx.client_request,
88        ctx.headers_plan,
89        ctx.jwt_auth_forwarding,
90        // Deduplicate subgraph requests only if the operation type is a query
91        ctx.operation_type_name == "Query",
92    );
93
94    if ctx.query_plan.node.is_some() {
95        executor
96            .execute(&mut exec_ctx, ctx.query_plan.node.as_ref())
97            .await?;
98    }
99
100    let mut response_headers = HeaderMap::new();
101    modify_client_response_headers(exec_ctx.response_headers_aggregator, &mut response_headers)
102        .with_plan_context(LazyPlanContext {
103            subgraph_name: || None,
104            affected_path: || None,
105        })?;
106
107    let final_response = &exec_ctx.final_response;
108    let error_count = exec_ctx.errors.len(); // Added for usage reporting
109    let body = project_by_operation(
110        final_response,
111        exec_ctx.errors,
112        &ctx.extensions,
113        ctx.operation_type_name,
114        ctx.projection_plan,
115        ctx.variable_values,
116        exec_ctx.response_storage.estimate_final_response_size(),
117        ctx.introspection_context.metadata,
118    )
119    .with_plan_context(LazyPlanContext {
120        subgraph_name: || None,
121        affected_path: || None,
122    })?;
123
124    Ok(PlanExecutionOutput {
125        body,
126        headers: response_headers,
127        error_count,
128    })
129}
130
131pub struct Executor<'exec, 'req> {
132    variable_values: &'exec Option<HashMap<String, sonic_rs::Value>>,
133    schema_metadata: &'exec SchemaMetadata,
134    executors: &'exec SubgraphExecutorMap,
135    client_request: &'exec ClientRequestDetails<'exec, 'req>,
136    headers_plan: &'exec HeaderRulesPlan,
137    jwt_forwarding_plan: &'exec Option<JwtAuthForwardingPlan>,
138    dedupe_subgraph_requests: bool,
139}
140
141struct ConcurrencyScope<'exec, T> {
142    jobs: FuturesUnordered<BoxFuture<'exec, T>>,
143}
144
145impl<'exec, T> ConcurrencyScope<'exec, T> {
146    fn new() -> Self {
147        Self {
148            jobs: FuturesUnordered::new(),
149        }
150    }
151
152    fn spawn(&mut self, future: BoxFuture<'exec, T>) {
153        self.jobs.push(future);
154    }
155
156    async fn join_all(mut self) -> Vec<T> {
157        let mut results = Vec::with_capacity(self.jobs.len());
158        while let Some(result) = self.jobs.next().await {
159            results.push(result);
160        }
161        results
162    }
163}
164
165struct SubgraphOutput {
166    body: Bytes,
167    headers: HeaderMap,
168}
169
170struct FetchJob {
171    fetch_node_id: i64,
172    subgraph_name: String,
173    response: SubgraphOutput,
174}
175
176struct FlattenFetchJob {
177    flatten_node_path: FlattenNodePath,
178    response: SubgraphOutput,
179    fetch_node_id: i64,
180    subgraph_name: String,
181    representation_hashes: Vec<u64>,
182    representation_hash_to_index: HashMap<u64, usize>,
183}
184
185enum ExecutionJob {
186    Fetch(FetchJob),
187    FlattenFetch(FlattenFetchJob),
188    None,
189}
190
191impl From<ExecutionJob> for SubgraphOutput {
192    fn from(value: ExecutionJob) -> Self {
193        match value {
194            ExecutionJob::Fetch(j) => Self {
195                body: j.response.body,
196                headers: j.response.headers,
197            },
198            ExecutionJob::FlattenFetch(j) => Self {
199                body: j.response.body,
200                headers: j.response.headers,
201            },
202            ExecutionJob::None => Self {
203                body: Bytes::new(),
204                headers: HeaderMap::new(),
205            },
206        }
207    }
208}
209
210impl From<HttpExecutionResponse> for SubgraphOutput {
211    fn from(res: HttpExecutionResponse) -> Self {
212        Self {
213            body: res.body,
214            headers: res.headers,
215        }
216    }
217}
218
219struct PreparedFlattenData {
220    representations: Vec<u8>,
221    representation_hashes: Vec<u64>,
222    representation_hash_to_index: HashMap<u64, usize>,
223}
224
225impl<'exec, 'req> Executor<'exec, 'req> {
226    pub fn new(
227        variable_values: &'exec Option<HashMap<String, sonic_rs::Value>>,
228        executors: &'exec SubgraphExecutorMap,
229        schema_metadata: &'exec SchemaMetadata,
230        client_request: &'exec ClientRequestDetails<'exec, 'req>,
231        headers_plan: &'exec HeaderRulesPlan,
232        jwt_forwarding_plan: &'exec Option<JwtAuthForwardingPlan>,
233        dedupe_subgraph_requests: bool,
234    ) -> Self {
235        Executor {
236            variable_values,
237            executors,
238            schema_metadata,
239            client_request,
240            headers_plan,
241            dedupe_subgraph_requests,
242            jwt_forwarding_plan,
243        }
244    }
245
246    pub async fn execute(
247        &self,
248        ctx: &mut ExecutionContext<'exec>,
249        plan: Option<&PlanNode>,
250    ) -> Result<(), PlanExecutionError> {
251        match plan {
252            Some(PlanNode::Fetch(node)) => self.execute_fetch_wave(ctx, node).await,
253            Some(PlanNode::Parallel(node)) => self.execute_parallel_wave(ctx, node).await,
254            Some(PlanNode::Sequence(node)) => self.execute_sequence_wave(ctx, node).await,
255            // Plans produced by our Query Planner can only start with: Fetch, Sequence or Parallel.
256            // Any other node type at the root is not supported, do nothing
257            Some(_) => Ok(()),
258            // An empty plan is valid, just do nothing
259            None => Ok(()),
260        }
261    }
262
263    async fn execute_fetch_wave(
264        &self,
265        ctx: &mut ExecutionContext<'exec>,
266        node: &FetchNode,
267    ) -> Result<(), PlanExecutionError> {
268        match self.execute_fetch_node(node, None).await {
269            Ok(result) => self.process_job_result(ctx, result),
270            Err(err) => {
271                self.log_error(&err);
272                ctx.errors.push(err.into());
273                Ok(())
274            }
275        }
276    }
277
278    async fn execute_sequence_wave(
279        &self,
280        ctx: &mut ExecutionContext<'exec>,
281        node: &SequenceNode,
282    ) -> Result<(), PlanExecutionError> {
283        for child in &node.nodes {
284            Box::pin(self.execute_plan_node(ctx, child)).await?;
285        }
286
287        Ok(())
288    }
289
290    async fn execute_parallel_wave(
291        &self,
292        ctx: &mut ExecutionContext<'exec>,
293        node: &ParallelNode,
294    ) -> Result<(), PlanExecutionError> {
295        let mut scope = ConcurrencyScope::new();
296
297        for child in &node.nodes {
298            let job_future = self.prepare_job_future(child, &ctx.final_response);
299            scope.spawn(job_future);
300        }
301
302        let results = scope.join_all().await;
303
304        for result in results {
305            match result {
306                Ok(job) => {
307                    self.process_job_result(ctx, job)?;
308                }
309                Err(err) => {
310                    self.log_error(&err);
311                    ctx.errors.push(err.into())
312                }
313            }
314        }
315
316        Ok(())
317    }
318
319    async fn execute_plan_node(
320        &self,
321        ctx: &mut ExecutionContext<'exec>,
322        node: &PlanNode,
323    ) -> Result<(), PlanExecutionError> {
324        match node {
325            PlanNode::Fetch(fetch_node) => match self.execute_fetch_node(fetch_node, None).await {
326                Ok(job) => {
327                    self.process_job_result(ctx, job)?;
328                }
329                Err(err) => {
330                    self.log_error(&err);
331                    ctx.errors.push(err.into());
332                }
333            },
334            PlanNode::Parallel(parallel_node) => {
335                self.execute_parallel_wave(ctx, parallel_node).await?;
336            }
337            PlanNode::Flatten(flatten_node) => {
338                match self.prepare_flatten_data(&ctx.final_response, flatten_node) {
339                    Ok(Some(p)) => {
340                        match self
341                            .execute_flatten_fetch_node(
342                                flatten_node,
343                                Some(p.representations),
344                                Some(p.representation_hashes),
345                                Some(p.representation_hash_to_index),
346                            )
347                            .await
348                        {
349                            Ok(job) => {
350                                self.process_job_result(ctx, job)?;
351                            }
352                            Err(err) => {
353                                self.log_error(&err);
354                                ctx.errors.push(err.into());
355                            }
356                        }
357                    }
358                    Ok(None) => { /* do nothing */ }
359                    Err(err) => {
360                        self.log_error(&err);
361                        ctx.errors.push(err.into());
362                    }
363                }
364            }
365            PlanNode::Sequence(sequence_node) => {
366                self.execute_sequence_wave(ctx, sequence_node).await?;
367            }
368            PlanNode::Condition(condition_node) => {
369                if let Some(node) =
370                    condition_node_by_variables(condition_node, self.variable_values)
371                {
372                    Box::pin(self.execute_plan_node(ctx, node)).await?;
373                }
374            }
375            // An unsupported plan node was found, do nothing.
376            _ => {}
377        }
378
379        Ok(())
380    }
381
382    fn prepare_job_future<'wave>(
383        &'wave self,
384        node: &'wave PlanNode,
385        final_response: &Value<'exec>,
386    ) -> BoxFuture<'wave, Result<ExecutionJob, PlanExecutionError>> {
387        match node {
388            PlanNode::Fetch(fetch_node) => Box::pin(self.execute_fetch_node(fetch_node, None)),
389            PlanNode::Flatten(flatten_node) => {
390                match self.prepare_flatten_data(final_response, flatten_node) {
391                    Ok(Some(p)) => Box::pin(self.execute_flatten_fetch_node(
392                        flatten_node,
393                        Some(p.representations),
394                        Some(p.representation_hashes),
395                        Some(p.representation_hash_to_index),
396                    )),
397                    Ok(None) => Box::pin(async { Ok(ExecutionJob::None) }),
398                    Err(e) => Box::pin(async move { Err(e) }),
399                }
400            }
401            PlanNode::Condition(node) => {
402                match condition_node_by_variables(node, self.variable_values) {
403                    Some(node) => Box::pin(self.prepare_job_future(node, final_response)), // This is already clean.
404                    None => Box::pin(async { Ok(ExecutionJob::None) }),
405                }
406            }
407            // Our Query Planner does not produce any other plan node types in ParallelNode
408            _ => Box::pin(async { Ok(ExecutionJob::None) }),
409        }
410    }
411
412    fn process_subgraph_response(
413        &self,
414        subgraph_name: &str,
415        ctx: &mut ExecutionContext<'exec>,
416        response_bytes: Bytes,
417        fetch_node_id: i64,
418    ) -> Option<(SubgraphResponse<'exec>, Option<&'exec Vec<FetchRewrite>>)> {
419        let idx = ctx.response_storage.add_response(response_bytes);
420        // SAFETY: The `bytes` are transmuted to the lifetime `'a` of the `ExecutionContext`.
421        // This is safe because the `response_storage` is part of the `ExecutionContext` (`ctx`)
422        // and will live as long as `'a`. The `Bytes` are stored in an `Arc`, so they won't be
423        // dropped until all references are gone. The `Value`s deserialized from this byte
424        // slice will borrow from it, and they are stored in `ctx.final_response`, which also
425        // lives for `'a`.
426        let bytes: &'exec [u8] =
427            unsafe { std::mem::transmute(ctx.response_storage.get_bytes(idx)) };
428
429        // SAFETY: The `output_rewrites` are transmuted to the lifetime `'a`. This is safe
430        // because `output_rewrites` is part of `OutputRewritesStorage` which is owned by
431        // `ExecutionContext` and lives for `'a`.
432        let output_rewrites: Option<&'exec Vec<FetchRewrite>> =
433            unsafe { std::mem::transmute(ctx.output_rewrites.get(fetch_node_id)) };
434
435        let mut deserializer = sonic_rs::Deserializer::from_slice(bytes);
436        let response = match SubgraphResponse::deserialize(&mut deserializer) {
437            Ok(response) => response,
438            Err(e) => {
439                let message = format!("Failed to deserialize subgraph response: {}", e);
440                let extensions = GraphQLErrorExtensions::new_from_code_and_service_name(
441                    "SUBGRAPH_RESPONSE_DESERIALIZATION_FAILED",
442                    subgraph_name,
443                );
444                let error = GraphQLError::from_message_and_extensions(message, extensions);
445
446                ctx.errors.push(error);
447                return None;
448            }
449        };
450
451        Some((response, output_rewrites))
452    }
453
454    fn process_job_result(
455        &self,
456        ctx: &mut ExecutionContext<'exec>,
457        job: ExecutionJob,
458    ) -> Result<(), PlanExecutionError> {
459        let _: () = match job {
460            ExecutionJob::Fetch(job) => {
461                apply_subgraph_response_headers(
462                    self.headers_plan,
463                    &job.subgraph_name,
464                    &job.response.headers,
465                    self.client_request,
466                    &mut ctx.response_headers_aggregator,
467                )
468                .with_plan_context(LazyPlanContext {
469                    subgraph_name: || Some(job.subgraph_name.clone()),
470                    affected_path: || None,
471                })?;
472
473                if let Some((mut response, output_rewrites)) = self.process_subgraph_response(
474                    job.subgraph_name.as_ref(),
475                    ctx,
476                    job.response.body,
477                    job.fetch_node_id,
478                ) {
479                    ctx.handle_errors(&job.subgraph_name, None, response.errors, None);
480                    if let Some(output_rewrites) = output_rewrites {
481                        for output_rewrite in output_rewrites {
482                            output_rewrite
483                                .rewrite(&self.schema_metadata.possible_types, &mut response.data);
484                        }
485                    }
486
487                    deep_merge(&mut ctx.final_response, response.data);
488                }
489            }
490            ExecutionJob::FlattenFetch(job) => {
491                apply_subgraph_response_headers(
492                    self.headers_plan,
493                    &job.subgraph_name,
494                    &job.response.headers,
495                    self.client_request,
496                    &mut ctx.response_headers_aggregator,
497                )
498                .with_plan_context(LazyPlanContext {
499                    subgraph_name: || Some(job.subgraph_name.clone()),
500                    affected_path: || Some(job.flatten_node_path.to_string()),
501                })?;
502
503                if let Some((mut response, output_rewrites)) = self.process_subgraph_response(
504                    &job.subgraph_name,
505                    ctx,
506                    job.response.body,
507                    job.fetch_node_id,
508                ) {
509                    if let Some(mut entities) = response.data.take_entities() {
510                        if let Some(output_rewrites) = output_rewrites {
511                            for output_rewrite in output_rewrites {
512                                for entity in &mut entities {
513                                    output_rewrite
514                                        .rewrite(&self.schema_metadata.possible_types, entity);
515                                }
516                            }
517                        }
518
519                        let mut index = 0;
520                        let normalized_path = job.flatten_node_path.as_slice();
521                        // If there is an error in the response, then collect the paths for normalizing the error
522                        let initial_error_path = response
523                            .errors
524                            .as_ref()
525                            .map(|_| GraphQLErrorPath::with_capacity(normalized_path.len() + 2));
526                        let mut entity_index_error_map = response
527                            .errors
528                            .as_ref()
529                            .map(|_| HashMap::with_capacity(entities.len()));
530                        traverse_and_callback_mut(
531                            &mut ctx.final_response,
532                            normalized_path,
533                            self.schema_metadata,
534                            initial_error_path,
535                            &mut |target, error_path| {
536                                let hash = job.representation_hashes[index];
537                                if let Some(entity_index) =
538                                    job.representation_hash_to_index.get(&hash)
539                                {
540                                    if let (Some(error_path), Some(entity_index_error_map)) =
541                                        (error_path, entity_index_error_map.as_mut())
542                                    {
543                                        let error_paths = entity_index_error_map
544                                            .entry(entity_index)
545                                            .or_insert_with(Vec::new);
546                                        error_paths.push(error_path);
547                                    }
548                                    if let Some(entity) = entities.get(*entity_index) {
549                                        // SAFETY: `new_val` is a clone of an entity that lives for `'a`.
550                                        // The transmute is to satisfy the compiler, but the lifetime
551                                        // is valid.
552                                        let new_val: Value<'_> =
553                                            unsafe { std::mem::transmute(entity.clone()) };
554                                        deep_merge(target, new_val);
555                                    }
556                                }
557                                index += 1;
558                            },
559                        );
560                        ctx.handle_errors(
561                            &job.subgraph_name,
562                            Some(job.flatten_node_path.to_string()),
563                            response.errors,
564                            entity_index_error_map,
565                        );
566                    } else if let Some(errors) = response.errors {
567                        // No entities were returned, but there are errors to handle.
568                        // We associate them with the flattened path and subgraph.
569                        let affected_path = job.flatten_node_path.to_string();
570                        ctx.errors.extend(errors.into_iter().map(|e| {
571                            e.add_subgraph_name(&job.subgraph_name)
572                                .add_affected_path(affected_path.clone())
573                        }));
574                    }
575                }
576            }
577            ExecutionJob::None => {
578                // nothing to do
579            }
580        };
581        Ok(())
582    }
583
584    fn prepare_flatten_data(
585        &self,
586        final_response: &Value<'exec>,
587        flatten_node: &FlattenNode,
588    ) -> Result<Option<PreparedFlattenData>, PlanExecutionError> {
589        let fetch_node = match flatten_node.node.as_ref() {
590            PlanNode::Fetch(fetch_node) => fetch_node,
591            _ => return Ok(None),
592        };
593        let requires_nodes = match fetch_node.requires.as_ref() {
594            Some(nodes) => nodes,
595            None => return Ok(None),
596        };
597
598        let mut index = 0;
599        let normalized_path = flatten_node.path.as_slice();
600        let mut filtered_representations = Vec::new();
601        filtered_representations.put(OPEN_BRACKET);
602        let proj_ctx = RequestProjectionContext::new(&self.schema_metadata.possible_types);
603        let mut representation_hashes: Vec<u64> = Vec::new();
604        let mut filtered_representations_hashes: HashMap<u64, usize> = HashMap::new();
605        let arena = bumpalo::Bump::new();
606
607        traverse_and_callback(
608            final_response,
609            normalized_path,
610            self.schema_metadata,
611            &mut |entity| {
612                let hash = entity.to_hash(&requires_nodes.items, proj_ctx.possible_types);
613
614                if !entity.is_null() {
615                    representation_hashes.push(hash);
616                }
617
618                if filtered_representations_hashes.contains_key(&hash) {
619                    return Ok::<(), PlanExecutionError>(());
620                }
621
622                let entity = if let Some(input_rewrites) = &fetch_node.input_rewrites {
623                    let new_entity = arena.alloc(entity.clone());
624                    for input_rewrite in input_rewrites {
625                        input_rewrite.rewrite(&self.schema_metadata.possible_types, new_entity);
626                    }
627                    new_entity
628                } else {
629                    entity
630                };
631
632                let is_projected = project_requires(
633                    &proj_ctx,
634                    &requires_nodes.items,
635                    entity,
636                    &mut filtered_representations,
637                    filtered_representations_hashes.is_empty(),
638                    None,
639                )
640                .with_plan_context(LazyPlanContext {
641                    subgraph_name: || Some(fetch_node.service_name.clone()),
642                    affected_path: || Some(flatten_node.path.to_string()),
643                })?;
644
645                if is_projected {
646                    filtered_representations_hashes.insert(hash, index);
647                }
648
649                index += 1;
650
651                Ok(())
652            },
653        )?;
654        filtered_representations.put(CLOSE_BRACKET);
655
656        if filtered_representations_hashes.is_empty() {
657            return Ok(None);
658        }
659
660        Ok(Some(PreparedFlattenData {
661            representations: filtered_representations,
662            representation_hashes,
663            representation_hash_to_index: filtered_representations_hashes,
664        }))
665    }
666
667    async fn execute_flatten_fetch_node(
668        &self,
669        node: &FlattenNode,
670        representations: Option<Vec<u8>>,
671        representation_hashes: Option<Vec<u64>>,
672        filtered_representations_hashes: Option<HashMap<u64, usize>>,
673    ) -> Result<ExecutionJob, PlanExecutionError> {
674        Ok(match node.node.as_ref() {
675            PlanNode::Fetch(fetch_node) => ExecutionJob::FlattenFetch(FlattenFetchJob {
676                flatten_node_path: node.path.clone(),
677                response: self
678                    .execute_fetch_node(fetch_node, representations)
679                    .await?
680                    .into(),
681                fetch_node_id: fetch_node.id,
682                subgraph_name: fetch_node.service_name.clone(),
683                representation_hashes: representation_hashes.unwrap_or_default(),
684                representation_hash_to_index: filtered_representations_hashes.unwrap_or_default(),
685            }),
686            _ => ExecutionJob::None,
687        })
688    }
689
690    async fn execute_fetch_node(
691        &self,
692        node: &FetchNode,
693        representations: Option<Vec<u8>>,
694    ) -> Result<ExecutionJob, PlanExecutionError> {
695        // TODO: We could optimize header map creation by caching them per service name
696        let mut headers_map = HeaderMap::new();
697        modify_subgraph_request_headers(
698            self.headers_plan,
699            &node.service_name,
700            self.client_request,
701            &mut headers_map,
702        )
703        .with_plan_context(LazyPlanContext {
704            subgraph_name: || Some(node.service_name.clone()),
705            affected_path: || None,
706        })?;
707        let variable_refs =
708            select_fetch_variables(self.variable_values, node.variable_usages.as_ref());
709
710        let mut subgraph_request = SubgraphExecutionRequest {
711            query: node.operation.document_str.as_str(),
712            dedupe: self.dedupe_subgraph_requests,
713            operation_name: node.operation_name.as_deref(),
714            variables: variable_refs,
715            representations,
716            headers: headers_map,
717            extensions: None,
718        };
719
720        if let Some(jwt_forwarding_plan) = &self.jwt_forwarding_plan {
721            subgraph_request.add_request_extensions_field(
722                jwt_forwarding_plan.extension_field_name.clone(),
723                jwt_forwarding_plan.extension_field_value.clone(),
724            );
725        }
726
727        Ok(ExecutionJob::Fetch(FetchJob {
728            fetch_node_id: node.id,
729            subgraph_name: node.service_name.clone(),
730            response: self
731                .executors
732                .execute(&node.service_name, subgraph_request, self.client_request)
733                .await
734                .into(),
735        }))
736    }
737
738    fn log_error(&self, error: &PlanExecutionError) {
739        tracing::error!(
740            subgraph_name = error.subgraph_name(),
741            error = error as &dyn std::error::Error,
742            "Plan execution error"
743        );
744    }
745}
746
747fn condition_node_by_variables<'a>(
748    condition_node: &'a ConditionNode,
749    variable_values: &'a Option<HashMap<String, sonic_rs::Value>>,
750) -> Option<&'a PlanNode> {
751    let vars = variable_values.as_ref()?;
752    let value = vars.get(&condition_node.condition)?;
753    let condition_met = matches!(value.as_ref(), ValueRef::Bool(true));
754
755    if condition_met {
756        condition_node.if_clause.as_deref()
757    } else {
758        condition_node.else_clause.as_deref()
759    }
760}
761
762fn select_fetch_variables<'a>(
763    variable_values: &'a Option<HashMap<String, sonic_rs::Value>>,
764    variable_usages: Option<&BTreeSet<String>>,
765) -> Option<HashMap<&'a str, &'a sonic_rs::Value>> {
766    let values = variable_values.as_ref()?;
767
768    variable_usages.map(|variable_usages| {
769        variable_usages
770            .iter()
771            .filter_map(|var_name| {
772                values
773                    .get_key_value(var_name.as_str())
774                    .map(|(key, value)| (key.as_str(), value))
775            })
776            .collect()
777    })
778}
779
780#[cfg(test)]
781mod tests {
782    use crate::{
783        context::ExecutionContext,
784        response::graphql_error::{GraphQLErrorExtensions, GraphQLErrorPath},
785    };
786
787    use super::select_fetch_variables;
788    use sonic_rs::Value;
789    use std::collections::{BTreeSet, HashMap};
790
791    fn value_from_number(n: i32) -> Value {
792        sonic_rs::from_str(&n.to_string()).unwrap()
793    }
794
795    #[test]
796    fn select_fetch_variables_only_used_variables() {
797        let mut variable_values_map = HashMap::new();
798        variable_values_map.insert("used".to_string(), value_from_number(1));
799        variable_values_map.insert("unused".to_string(), value_from_number(2));
800        let variable_values = Some(variable_values_map);
801
802        let mut usages = BTreeSet::new();
803        usages.insert("used".to_string());
804
805        let selected = select_fetch_variables(&variable_values, Some(&usages)).unwrap();
806
807        assert_eq!(selected.len(), 1);
808        assert!(selected.contains_key("used"));
809        assert!(!selected.contains_key("unused"));
810    }
811
812    #[test]
813    fn select_fetch_variables_ignores_missing_usage_entries() {
814        let mut variable_values_map = HashMap::new();
815        variable_values_map.insert("present".to_string(), value_from_number(3));
816        let variable_values = Some(variable_values_map);
817
818        let mut usages = BTreeSet::new();
819        usages.insert("present".to_string());
820        usages.insert("missing".to_string());
821
822        let selected = select_fetch_variables(&variable_values, Some(&usages)).unwrap();
823
824        assert_eq!(selected.len(), 1);
825        assert!(selected.contains_key("present"));
826        assert!(!selected.contains_key("missing"));
827    }
828
829    #[test]
830    fn select_fetch_variables_for_no_usage_entries() {
831        let mut variable_values_map = HashMap::new();
832        variable_values_map.insert("unused_1".to_string(), value_from_number(1));
833        variable_values_map.insert("unused_2".to_string(), value_from_number(2));
834
835        let variable_values = Some(variable_values_map);
836
837        let selected = select_fetch_variables(&variable_values, None);
838
839        assert!(selected.is_none());
840    }
841    #[test]
842    /**
843     * We have the same entity in two different paths ["a", 0] and ["b", 1],
844     * and the subgraph response has an error for this entity.
845     * So we should duplicate the error for both paths.
846     */
847    fn normalize_entity_errors_correctly() {
848        use crate::response::graphql_error::{GraphQLError, GraphQLErrorPathSegment};
849        use std::collections::HashMap;
850        let mut ctx = ExecutionContext::default();
851        let mut entity_index_error_map: HashMap<&usize, Vec<GraphQLErrorPath>> = HashMap::new();
852        entity_index_error_map.insert(
853            &0,
854            vec![
855                GraphQLErrorPath {
856                    segments: vec![
857                        GraphQLErrorPathSegment::String("a".to_string()),
858                        GraphQLErrorPathSegment::Index(0),
859                    ],
860                },
861                GraphQLErrorPath {
862                    segments: vec![
863                        GraphQLErrorPathSegment::String("b".to_string()),
864                        GraphQLErrorPathSegment::Index(1),
865                    ],
866                },
867            ],
868        );
869        let response_errors = vec![GraphQLError {
870            message: "Error 1".to_string(),
871            locations: None,
872            path: Some(GraphQLErrorPath {
873                segments: vec![
874                    GraphQLErrorPathSegment::String("_entities".to_string()),
875                    GraphQLErrorPathSegment::Index(0),
876                    GraphQLErrorPathSegment::String("field1".to_string()),
877                ],
878            }),
879            extensions: GraphQLErrorExtensions::default(),
880        }];
881        ctx.handle_errors(
882            "subgraph_a",
883            None,
884            Some(response_errors),
885            Some(entity_index_error_map),
886        );
887        assert_eq!(ctx.errors.len(), 2);
888        assert_eq!(ctx.errors[0].message, "Error 1");
889        assert_eq!(
890            ctx.errors[0].path.as_ref().unwrap().segments,
891            vec![
892                GraphQLErrorPathSegment::String("a".to_string()),
893                GraphQLErrorPathSegment::Index(0),
894                GraphQLErrorPathSegment::String("field1".to_string())
895            ]
896        );
897        assert_eq!(ctx.errors[1].message, "Error 1");
898        assert_eq!(
899            ctx.errors[1].path.as_ref().unwrap().segments,
900            vec![
901                GraphQLErrorPathSegment::String("b".to_string()),
902                GraphQLErrorPathSegment::Index(1),
903                GraphQLErrorPathSegment::String("field1".to_string())
904            ]
905        );
906    }
907}