hive_router_plan_executor/execution/
plan.rs

1use std::collections::{BTreeSet, HashMap};
2
3use bytes::{BufMut, Bytes};
4use futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt};
5use hive_router_query_planner::planner::plan_nodes::{
6    ConditionNode, FetchNode, FetchRewrite, FlattenNode, FlattenNodePath, ParallelNode, PlanNode,
7    QueryPlan, SequenceNode,
8};
9use http::HeaderMap;
10use serde::Deserialize;
11use sonic_rs::ValueRef;
12
13use crate::{
14    context::ExecutionContext,
15    execution::{
16        client_request_details::ClientRequestDetails,
17        error::{IntoPlanExecutionError, LazyPlanContext, PlanExecutionError},
18        jwt_forward::JwtAuthForwardingPlan,
19        rewrites::FetchRewriteExt,
20    },
21    executors::{
22        common::{HttpExecutionResponse, SubgraphExecutionRequest},
23        map::SubgraphExecutorMap,
24    },
25    headers::{
26        plan::HeaderRulesPlan,
27        request::modify_subgraph_request_headers,
28        response::{apply_subgraph_response_headers, modify_client_response_headers},
29    },
30    introspection::{
31        resolve::{resolve_introspection, IntrospectionContext},
32        schema::SchemaMetadata,
33    },
34    projection::{
35        plan::FieldProjectionPlan,
36        request::{project_requires, RequestProjectionContext},
37        response::project_by_operation,
38    },
39    response::{
40        graphql_error::{GraphQLError, GraphQLErrorExtensions, GraphQLErrorPath},
41        merge::deep_merge,
42        subgraph_response::SubgraphResponse,
43        value::Value,
44    },
45    utils::{
46        consts::{CLOSE_BRACKET, OPEN_BRACKET},
47        traverse::{traverse_and_callback, traverse_and_callback_mut},
48    },
49};
50
51pub struct QueryPlanExecutionContext<'exec, 'req> {
52    pub query_plan: &'exec QueryPlan,
53    pub projection_plan: &'exec Vec<FieldProjectionPlan>,
54    pub headers_plan: &'exec HeaderRulesPlan,
55    pub variable_values: &'exec Option<HashMap<String, sonic_rs::Value>>,
56    pub extensions: Option<HashMap<String, sonic_rs::Value>>,
57    pub client_request: &'exec ClientRequestDetails<'exec, 'req>,
58    pub introspection_context: &'exec IntrospectionContext<'exec, 'static>,
59    pub operation_type_name: &'exec str,
60    pub executors: &'exec SubgraphExecutorMap,
61    pub jwt_auth_forwarding: &'exec Option<JwtAuthForwardingPlan>,
62    pub initial_errors: Vec<GraphQLError>,
63}
64
65pub struct PlanExecutionOutput {
66    pub body: Vec<u8>,
67    pub headers: HeaderMap,
68}
69
70pub async fn execute_query_plan<'exec, 'req>(
71    ctx: QueryPlanExecutionContext<'exec, 'req>,
72) -> Result<PlanExecutionOutput, PlanExecutionError> {
73    let init_value = if let Some(introspection_query) = ctx.introspection_context.query {
74        resolve_introspection(introspection_query, ctx.introspection_context)
75    } else if ctx.projection_plan.is_empty() {
76        Value::Null
77    } else {
78        Value::Object(Vec::new())
79    };
80
81    let mut exec_ctx = ExecutionContext::new(ctx.query_plan, init_value, ctx.initial_errors);
82    let executor = Executor::new(
83        ctx.variable_values,
84        ctx.executors,
85        ctx.introspection_context.metadata,
86        ctx.client_request,
87        ctx.headers_plan,
88        ctx.jwt_auth_forwarding,
89        // Deduplicate subgraph requests only if the operation type is a query
90        ctx.operation_type_name == "Query",
91    );
92
93    if ctx.query_plan.node.is_some() {
94        executor
95            .execute(&mut exec_ctx, ctx.query_plan.node.as_ref())
96            .await?;
97    }
98
99    let mut response_headers = HeaderMap::new();
100    modify_client_response_headers(exec_ctx.response_headers_aggregator, &mut response_headers)
101        .with_plan_context(LazyPlanContext {
102            subgraph_name: || None,
103            affected_path: || None,
104        })?;
105
106    let final_response = &exec_ctx.final_response;
107    let body = project_by_operation(
108        final_response,
109        exec_ctx.errors,
110        &ctx.extensions,
111        ctx.operation_type_name,
112        ctx.projection_plan,
113        ctx.variable_values,
114        exec_ctx.response_storage.estimate_final_response_size(),
115    )
116    .with_plan_context(LazyPlanContext {
117        subgraph_name: || None,
118        affected_path: || None,
119    })?;
120
121    Ok(PlanExecutionOutput {
122        body,
123        headers: response_headers,
124    })
125}
126
127pub struct Executor<'exec, 'req> {
128    variable_values: &'exec Option<HashMap<String, sonic_rs::Value>>,
129    schema_metadata: &'exec SchemaMetadata,
130    executors: &'exec SubgraphExecutorMap,
131    client_request: &'exec ClientRequestDetails<'exec, 'req>,
132    headers_plan: &'exec HeaderRulesPlan,
133    jwt_forwarding_plan: &'exec Option<JwtAuthForwardingPlan>,
134    dedupe_subgraph_requests: bool,
135}
136
137struct ConcurrencyScope<'exec, T> {
138    jobs: FuturesUnordered<BoxFuture<'exec, T>>,
139}
140
141impl<'exec, T> ConcurrencyScope<'exec, T> {
142    fn new() -> Self {
143        Self {
144            jobs: FuturesUnordered::new(),
145        }
146    }
147
148    fn spawn(&mut self, future: BoxFuture<'exec, T>) {
149        self.jobs.push(future);
150    }
151
152    async fn join_all(mut self) -> Vec<T> {
153        let mut results = Vec::with_capacity(self.jobs.len());
154        while let Some(result) = self.jobs.next().await {
155            results.push(result);
156        }
157        results
158    }
159}
160
161struct SubgraphOutput {
162    body: Bytes,
163    headers: HeaderMap,
164}
165
166struct FetchJob {
167    fetch_node_id: i64,
168    subgraph_name: String,
169    response: SubgraphOutput,
170}
171
172struct FlattenFetchJob {
173    flatten_node_path: FlattenNodePath,
174    response: SubgraphOutput,
175    fetch_node_id: i64,
176    subgraph_name: String,
177    representation_hashes: Vec<u64>,
178    representation_hash_to_index: HashMap<u64, usize>,
179}
180
181enum ExecutionJob {
182    Fetch(FetchJob),
183    FlattenFetch(FlattenFetchJob),
184    None,
185}
186
187impl From<ExecutionJob> for SubgraphOutput {
188    fn from(value: ExecutionJob) -> Self {
189        match value {
190            ExecutionJob::Fetch(j) => Self {
191                body: j.response.body,
192                headers: j.response.headers,
193            },
194            ExecutionJob::FlattenFetch(j) => Self {
195                body: j.response.body,
196                headers: j.response.headers,
197            },
198            ExecutionJob::None => Self {
199                body: Bytes::new(),
200                headers: HeaderMap::new(),
201            },
202        }
203    }
204}
205
206impl From<HttpExecutionResponse> for SubgraphOutput {
207    fn from(res: HttpExecutionResponse) -> Self {
208        Self {
209            body: res.body,
210            headers: res.headers,
211        }
212    }
213}
214
215struct PreparedFlattenData {
216    representations: Vec<u8>,
217    representation_hashes: Vec<u64>,
218    representation_hash_to_index: HashMap<u64, usize>,
219}
220
221impl<'exec, 'req> Executor<'exec, 'req> {
222    pub fn new(
223        variable_values: &'exec Option<HashMap<String, sonic_rs::Value>>,
224        executors: &'exec SubgraphExecutorMap,
225        schema_metadata: &'exec SchemaMetadata,
226        client_request: &'exec ClientRequestDetails<'exec, 'req>,
227        headers_plan: &'exec HeaderRulesPlan,
228        jwt_forwarding_plan: &'exec Option<JwtAuthForwardingPlan>,
229        dedupe_subgraph_requests: bool,
230    ) -> Self {
231        Executor {
232            variable_values,
233            executors,
234            schema_metadata,
235            client_request,
236            headers_plan,
237            dedupe_subgraph_requests,
238            jwt_forwarding_plan,
239        }
240    }
241
242    pub async fn execute(
243        &self,
244        ctx: &mut ExecutionContext<'exec>,
245        plan: Option<&PlanNode>,
246    ) -> Result<(), PlanExecutionError> {
247        match plan {
248            Some(PlanNode::Fetch(node)) => self.execute_fetch_wave(ctx, node).await,
249            Some(PlanNode::Parallel(node)) => self.execute_parallel_wave(ctx, node).await,
250            Some(PlanNode::Sequence(node)) => self.execute_sequence_wave(ctx, node).await,
251            // Plans produced by our Query Planner can only start with: Fetch, Sequence or Parallel.
252            // Any other node type at the root is not supported, do nothing
253            Some(_) => Ok(()),
254            // An empty plan is valid, just do nothing
255            None => Ok(()),
256        }
257    }
258
259    async fn execute_fetch_wave(
260        &self,
261        ctx: &mut ExecutionContext<'exec>,
262        node: &FetchNode,
263    ) -> Result<(), PlanExecutionError> {
264        match self.execute_fetch_node(node, None).await {
265            Ok(result) => self.process_job_result(ctx, result),
266            Err(err) => {
267                self.log_error(&err);
268                ctx.errors.push(err.into());
269                Ok(())
270            }
271        }
272    }
273
274    async fn execute_sequence_wave(
275        &self,
276        ctx: &mut ExecutionContext<'exec>,
277        node: &SequenceNode,
278    ) -> Result<(), PlanExecutionError> {
279        for child in &node.nodes {
280            Box::pin(self.execute_plan_node(ctx, child)).await?;
281        }
282
283        Ok(())
284    }
285
286    async fn execute_parallel_wave(
287        &self,
288        ctx: &mut ExecutionContext<'exec>,
289        node: &ParallelNode,
290    ) -> Result<(), PlanExecutionError> {
291        let mut scope = ConcurrencyScope::new();
292
293        for child in &node.nodes {
294            let job_future = self.prepare_job_future(child, &ctx.final_response);
295            scope.spawn(job_future);
296        }
297
298        let results = scope.join_all().await;
299
300        for result in results {
301            match result {
302                Ok(job) => {
303                    self.process_job_result(ctx, job)?;
304                }
305                Err(err) => {
306                    self.log_error(&err);
307                    ctx.errors.push(err.into())
308                }
309            }
310        }
311
312        Ok(())
313    }
314
315    async fn execute_plan_node(
316        &self,
317        ctx: &mut ExecutionContext<'exec>,
318        node: &PlanNode,
319    ) -> Result<(), PlanExecutionError> {
320        match node {
321            PlanNode::Fetch(fetch_node) => match self.execute_fetch_node(fetch_node, None).await {
322                Ok(job) => {
323                    self.process_job_result(ctx, job)?;
324                }
325                Err(err) => {
326                    self.log_error(&err);
327                    ctx.errors.push(err.into());
328                }
329            },
330            PlanNode::Parallel(parallel_node) => {
331                self.execute_parallel_wave(ctx, parallel_node).await?;
332            }
333            PlanNode::Flatten(flatten_node) => {
334                match self.prepare_flatten_data(&ctx.final_response, flatten_node) {
335                    Ok(Some(p)) => {
336                        match self
337                            .execute_flatten_fetch_node(
338                                flatten_node,
339                                Some(p.representations),
340                                Some(p.representation_hashes),
341                                Some(p.representation_hash_to_index),
342                            )
343                            .await
344                        {
345                            Ok(job) => {
346                                self.process_job_result(ctx, job)?;
347                            }
348                            Err(err) => {
349                                self.log_error(&err);
350                                ctx.errors.push(err.into());
351                            }
352                        }
353                    }
354                    Ok(None) => { /* do nothing */ }
355                    Err(err) => {
356                        self.log_error(&err);
357                        ctx.errors.push(err.into());
358                    }
359                }
360            }
361            PlanNode::Sequence(sequence_node) => {
362                self.execute_sequence_wave(ctx, sequence_node).await?;
363            }
364            PlanNode::Condition(condition_node) => {
365                if let Some(node) =
366                    condition_node_by_variables(condition_node, self.variable_values)
367                {
368                    Box::pin(self.execute_plan_node(ctx, node)).await?;
369                }
370            }
371            // An unsupported plan node was found, do nothing.
372            _ => {}
373        }
374
375        Ok(())
376    }
377
378    fn prepare_job_future<'wave>(
379        &'wave self,
380        node: &'wave PlanNode,
381        final_response: &Value<'exec>,
382    ) -> BoxFuture<'wave, Result<ExecutionJob, PlanExecutionError>> {
383        match node {
384            PlanNode::Fetch(fetch_node) => Box::pin(self.execute_fetch_node(fetch_node, None)),
385            PlanNode::Flatten(flatten_node) => {
386                match self.prepare_flatten_data(final_response, flatten_node) {
387                    Ok(Some(p)) => Box::pin(self.execute_flatten_fetch_node(
388                        flatten_node,
389                        Some(p.representations),
390                        Some(p.representation_hashes),
391                        Some(p.representation_hash_to_index),
392                    )),
393                    Ok(None) => Box::pin(async { Ok(ExecutionJob::None) }),
394                    Err(e) => Box::pin(async move { Err(e) }),
395                }
396            }
397            PlanNode::Condition(node) => {
398                match condition_node_by_variables(node, self.variable_values) {
399                    Some(node) => Box::pin(self.prepare_job_future(node, final_response)), // This is already clean.
400                    None => Box::pin(async { Ok(ExecutionJob::None) }),
401                }
402            }
403            // Our Query Planner does not produce any other plan node types in ParallelNode
404            _ => Box::pin(async { Ok(ExecutionJob::None) }),
405        }
406    }
407
408    fn process_subgraph_response(
409        &self,
410        subgraph_name: &str,
411        ctx: &mut ExecutionContext<'exec>,
412        response_bytes: Bytes,
413        fetch_node_id: i64,
414    ) -> Option<(SubgraphResponse<'exec>, Option<&'exec Vec<FetchRewrite>>)> {
415        let idx = ctx.response_storage.add_response(response_bytes);
416        // SAFETY: The `bytes` are transmuted to the lifetime `'a` of the `ExecutionContext`.
417        // This is safe because the `response_storage` is part of the `ExecutionContext` (`ctx`)
418        // and will live as long as `'a`. The `Bytes` are stored in an `Arc`, so they won't be
419        // dropped until all references are gone. The `Value`s deserialized from this byte
420        // slice will borrow from it, and they are stored in `ctx.final_response`, which also
421        // lives for `'a`.
422        let bytes: &'exec [u8] =
423            unsafe { std::mem::transmute(ctx.response_storage.get_bytes(idx)) };
424
425        // SAFETY: The `output_rewrites` are transmuted to the lifetime `'a`. This is safe
426        // because `output_rewrites` is part of `OutputRewritesStorage` which is owned by
427        // `ExecutionContext` and lives for `'a`.
428        let output_rewrites: Option<&'exec Vec<FetchRewrite>> =
429            unsafe { std::mem::transmute(ctx.output_rewrites.get(fetch_node_id)) };
430
431        let mut deserializer = sonic_rs::Deserializer::from_slice(bytes);
432        let response = match SubgraphResponse::deserialize(&mut deserializer) {
433            Ok(response) => response,
434            Err(e) => {
435                let message = format!("Failed to deserialize subgraph response: {}", e);
436                let extensions = GraphQLErrorExtensions::new_from_code_and_service_name(
437                    "SUBGRAPH_RESPONSE_DESERIALIZATION_FAILED",
438                    subgraph_name,
439                );
440                let error = GraphQLError::from_message_and_extensions(message, extensions);
441
442                ctx.errors.push(error);
443                return None;
444            }
445        };
446
447        Some((response, output_rewrites))
448    }
449
450    fn process_job_result(
451        &self,
452        ctx: &mut ExecutionContext<'exec>,
453        job: ExecutionJob,
454    ) -> Result<(), PlanExecutionError> {
455        let _: () = match job {
456            ExecutionJob::Fetch(job) => {
457                apply_subgraph_response_headers(
458                    self.headers_plan,
459                    &job.subgraph_name,
460                    &job.response.headers,
461                    self.client_request,
462                    &mut ctx.response_headers_aggregator,
463                )
464                .with_plan_context(LazyPlanContext {
465                    subgraph_name: || Some(job.subgraph_name.clone()),
466                    affected_path: || None,
467                })?;
468
469                if let Some((mut response, output_rewrites)) = self.process_subgraph_response(
470                    job.subgraph_name.as_ref(),
471                    ctx,
472                    job.response.body,
473                    job.fetch_node_id,
474                ) {
475                    ctx.handle_errors(&job.subgraph_name, None, response.errors, None);
476                    if let Some(output_rewrites) = output_rewrites {
477                        for output_rewrite in output_rewrites {
478                            output_rewrite
479                                .rewrite(&self.schema_metadata.possible_types, &mut response.data);
480                        }
481                    }
482
483                    deep_merge(&mut ctx.final_response, response.data);
484                }
485            }
486            ExecutionJob::FlattenFetch(job) => {
487                apply_subgraph_response_headers(
488                    self.headers_plan,
489                    &job.subgraph_name,
490                    &job.response.headers,
491                    self.client_request,
492                    &mut ctx.response_headers_aggregator,
493                )
494                .with_plan_context(LazyPlanContext {
495                    subgraph_name: || Some(job.subgraph_name.clone()),
496                    affected_path: || Some(job.flatten_node_path.to_string()),
497                })?;
498
499                if let Some((mut response, output_rewrites)) = self.process_subgraph_response(
500                    &job.subgraph_name,
501                    ctx,
502                    job.response.body,
503                    job.fetch_node_id,
504                ) {
505                    if let Some(mut entities) = response.data.take_entities() {
506                        if let Some(output_rewrites) = output_rewrites {
507                            for output_rewrite in output_rewrites {
508                                for entity in &mut entities {
509                                    output_rewrite
510                                        .rewrite(&self.schema_metadata.possible_types, entity);
511                                }
512                            }
513                        }
514
515                        let mut index = 0;
516                        let normalized_path = job.flatten_node_path.as_slice();
517                        // If there is an error in the response, then collect the paths for normalizing the error
518                        let initial_error_path = response
519                            .errors
520                            .as_ref()
521                            .map(|_| GraphQLErrorPath::with_capacity(normalized_path.len() + 2));
522                        let mut entity_index_error_map = response
523                            .errors
524                            .as_ref()
525                            .map(|_| HashMap::with_capacity(entities.len()));
526                        traverse_and_callback_mut(
527                            &mut ctx.final_response,
528                            normalized_path,
529                            self.schema_metadata,
530                            initial_error_path,
531                            &mut |target, error_path| {
532                                let hash = job.representation_hashes[index];
533                                if let Some(entity_index) =
534                                    job.representation_hash_to_index.get(&hash)
535                                {
536                                    if let (Some(error_path), Some(entity_index_error_map)) =
537                                        (error_path, entity_index_error_map.as_mut())
538                                    {
539                                        let error_paths = entity_index_error_map
540                                            .entry(entity_index)
541                                            .or_insert_with(Vec::new);
542                                        error_paths.push(error_path);
543                                    }
544                                    if let Some(entity) = entities.get(*entity_index) {
545                                        // SAFETY: `new_val` is a clone of an entity that lives for `'a`.
546                                        // The transmute is to satisfy the compiler, but the lifetime
547                                        // is valid.
548                                        let new_val: Value<'_> =
549                                            unsafe { std::mem::transmute(entity.clone()) };
550                                        deep_merge(target, new_val);
551                                    }
552                                }
553                                index += 1;
554                            },
555                        );
556                        ctx.handle_errors(
557                            &job.subgraph_name,
558                            Some(job.flatten_node_path.to_string()),
559                            response.errors,
560                            entity_index_error_map,
561                        );
562                    } else if let Some(errors) = response.errors {
563                        // No entities were returned, but there are errors to handle.
564                        // We associate them with the flattened path and subgraph.
565                        let affected_path = job.flatten_node_path.to_string();
566                        ctx.errors.extend(errors.into_iter().map(|e| {
567                            e.add_subgraph_name(&job.subgraph_name)
568                                .add_affected_path(affected_path.clone())
569                        }));
570                    }
571                }
572            }
573            ExecutionJob::None => {
574                // nothing to do
575            }
576        };
577        Ok(())
578    }
579
580    fn prepare_flatten_data(
581        &self,
582        final_response: &Value<'exec>,
583        flatten_node: &FlattenNode,
584    ) -> Result<Option<PreparedFlattenData>, PlanExecutionError> {
585        let fetch_node = match flatten_node.node.as_ref() {
586            PlanNode::Fetch(fetch_node) => fetch_node,
587            _ => return Ok(None),
588        };
589        let requires_nodes = match fetch_node.requires.as_ref() {
590            Some(nodes) => nodes,
591            None => return Ok(None),
592        };
593
594        let mut index = 0;
595        let normalized_path = flatten_node.path.as_slice();
596        let mut filtered_representations = Vec::new();
597        filtered_representations.put(OPEN_BRACKET);
598        let proj_ctx = RequestProjectionContext::new(&self.schema_metadata.possible_types);
599        let mut representation_hashes: Vec<u64> = Vec::new();
600        let mut filtered_representations_hashes: HashMap<u64, usize> = HashMap::new();
601        let arena = bumpalo::Bump::new();
602
603        traverse_and_callback(
604            final_response,
605            normalized_path,
606            self.schema_metadata,
607            &mut |entity| {
608                let hash = entity.to_hash(&requires_nodes.items, proj_ctx.possible_types);
609
610                if !entity.is_null() {
611                    representation_hashes.push(hash);
612                }
613
614                if filtered_representations_hashes.contains_key(&hash) {
615                    return Ok::<(), PlanExecutionError>(());
616                }
617
618                let entity = if let Some(input_rewrites) = &fetch_node.input_rewrites {
619                    let new_entity = arena.alloc(entity.clone());
620                    for input_rewrite in input_rewrites {
621                        input_rewrite.rewrite(&self.schema_metadata.possible_types, new_entity);
622                    }
623                    new_entity
624                } else {
625                    entity
626                };
627
628                let is_projected = project_requires(
629                    &proj_ctx,
630                    &requires_nodes.items,
631                    entity,
632                    &mut filtered_representations,
633                    filtered_representations_hashes.is_empty(),
634                    None,
635                )
636                .with_plan_context(LazyPlanContext {
637                    subgraph_name: || Some(fetch_node.service_name.clone()),
638                    affected_path: || Some(flatten_node.path.to_string()),
639                })?;
640
641                if is_projected {
642                    filtered_representations_hashes.insert(hash, index);
643                }
644
645                index += 1;
646
647                Ok(())
648            },
649        )?;
650        filtered_representations.put(CLOSE_BRACKET);
651
652        if filtered_representations_hashes.is_empty() {
653            return Ok(None);
654        }
655
656        Ok(Some(PreparedFlattenData {
657            representations: filtered_representations,
658            representation_hashes,
659            representation_hash_to_index: filtered_representations_hashes,
660        }))
661    }
662
663    async fn execute_flatten_fetch_node(
664        &self,
665        node: &FlattenNode,
666        representations: Option<Vec<u8>>,
667        representation_hashes: Option<Vec<u64>>,
668        filtered_representations_hashes: Option<HashMap<u64, usize>>,
669    ) -> Result<ExecutionJob, PlanExecutionError> {
670        Ok(match node.node.as_ref() {
671            PlanNode::Fetch(fetch_node) => ExecutionJob::FlattenFetch(FlattenFetchJob {
672                flatten_node_path: node.path.clone(),
673                response: self
674                    .execute_fetch_node(fetch_node, representations)
675                    .await?
676                    .into(),
677                fetch_node_id: fetch_node.id,
678                subgraph_name: fetch_node.service_name.clone(),
679                representation_hashes: representation_hashes.unwrap_or_default(),
680                representation_hash_to_index: filtered_representations_hashes.unwrap_or_default(),
681            }),
682            _ => ExecutionJob::None,
683        })
684    }
685
686    async fn execute_fetch_node(
687        &self,
688        node: &FetchNode,
689        representations: Option<Vec<u8>>,
690    ) -> Result<ExecutionJob, PlanExecutionError> {
691        // TODO: We could optimize header map creation by caching them per service name
692        let mut headers_map = HeaderMap::new();
693        modify_subgraph_request_headers(
694            self.headers_plan,
695            &node.service_name,
696            self.client_request,
697            &mut headers_map,
698        )
699        .with_plan_context(LazyPlanContext {
700            subgraph_name: || Some(node.service_name.clone()),
701            affected_path: || None,
702        })?;
703        let variable_refs =
704            select_fetch_variables(self.variable_values, node.variable_usages.as_ref());
705
706        let mut subgraph_request = SubgraphExecutionRequest {
707            query: node.operation.document_str.as_str(),
708            dedupe: self.dedupe_subgraph_requests,
709            operation_name: node.operation_name.as_deref(),
710            variables: variable_refs,
711            representations,
712            headers: headers_map,
713            extensions: None,
714        };
715
716        if let Some(jwt_forwarding_plan) = &self.jwt_forwarding_plan {
717            subgraph_request.add_request_extensions_field(
718                jwt_forwarding_plan.extension_field_name.clone(),
719                jwt_forwarding_plan.extension_field_value.clone(),
720            );
721        }
722
723        Ok(ExecutionJob::Fetch(FetchJob {
724            fetch_node_id: node.id,
725            subgraph_name: node.service_name.clone(),
726            response: self
727                .executors
728                .execute(&node.service_name, subgraph_request, self.client_request)
729                .await
730                .into(),
731        }))
732    }
733
734    fn log_error(&self, error: &PlanExecutionError) {
735        tracing::error!(
736            subgraph_name = error.subgraph_name(),
737            error = error as &dyn std::error::Error,
738            "Plan execution error"
739        );
740    }
741}
742
743fn condition_node_by_variables<'a>(
744    condition_node: &'a ConditionNode,
745    variable_values: &'a Option<HashMap<String, sonic_rs::Value>>,
746) -> Option<&'a PlanNode> {
747    let vars = variable_values.as_ref()?;
748    let value = vars.get(&condition_node.condition)?;
749    let condition_met = matches!(value.as_ref(), ValueRef::Bool(true));
750
751    if condition_met {
752        condition_node.if_clause.as_deref()
753    } else {
754        condition_node.else_clause.as_deref()
755    }
756}
757
758fn select_fetch_variables<'a>(
759    variable_values: &'a Option<HashMap<String, sonic_rs::Value>>,
760    variable_usages: Option<&BTreeSet<String>>,
761) -> Option<HashMap<&'a str, &'a sonic_rs::Value>> {
762    let values = variable_values.as_ref()?;
763
764    variable_usages.map(|variable_usages| {
765        variable_usages
766            .iter()
767            .filter_map(|var_name| {
768                values
769                    .get_key_value(var_name.as_str())
770                    .map(|(key, value)| (key.as_str(), value))
771            })
772            .collect()
773    })
774}
775
776#[cfg(test)]
777mod tests {
778    use crate::{
779        context::ExecutionContext,
780        response::graphql_error::{GraphQLErrorExtensions, GraphQLErrorPath},
781    };
782
783    use super::select_fetch_variables;
784    use sonic_rs::Value;
785    use std::collections::{BTreeSet, HashMap};
786
787    fn value_from_number(n: i32) -> Value {
788        sonic_rs::from_str(&n.to_string()).unwrap()
789    }
790
791    #[test]
792    fn select_fetch_variables_only_used_variables() {
793        let mut variable_values_map = HashMap::new();
794        variable_values_map.insert("used".to_string(), value_from_number(1));
795        variable_values_map.insert("unused".to_string(), value_from_number(2));
796        let variable_values = Some(variable_values_map);
797
798        let mut usages = BTreeSet::new();
799        usages.insert("used".to_string());
800
801        let selected = select_fetch_variables(&variable_values, Some(&usages)).unwrap();
802
803        assert_eq!(selected.len(), 1);
804        assert!(selected.contains_key("used"));
805        assert!(!selected.contains_key("unused"));
806    }
807
808    #[test]
809    fn select_fetch_variables_ignores_missing_usage_entries() {
810        let mut variable_values_map = HashMap::new();
811        variable_values_map.insert("present".to_string(), value_from_number(3));
812        let variable_values = Some(variable_values_map);
813
814        let mut usages = BTreeSet::new();
815        usages.insert("present".to_string());
816        usages.insert("missing".to_string());
817
818        let selected = select_fetch_variables(&variable_values, Some(&usages)).unwrap();
819
820        assert_eq!(selected.len(), 1);
821        assert!(selected.contains_key("present"));
822        assert!(!selected.contains_key("missing"));
823    }
824
825    #[test]
826    fn select_fetch_variables_for_no_usage_entries() {
827        let mut variable_values_map = HashMap::new();
828        variable_values_map.insert("unused_1".to_string(), value_from_number(1));
829        variable_values_map.insert("unused_2".to_string(), value_from_number(2));
830
831        let variable_values = Some(variable_values_map);
832
833        let selected = select_fetch_variables(&variable_values, None);
834
835        assert!(selected.is_none());
836    }
837    #[test]
838    /**
839     * We have the same entity in two different paths ["a", 0] and ["b", 1],
840     * and the subgraph response has an error for this entity.
841     * So we should duplicate the error for both paths.
842     */
843    fn normalize_entity_errors_correctly() {
844        use crate::response::graphql_error::{GraphQLError, GraphQLErrorPathSegment};
845        use std::collections::HashMap;
846        let mut ctx = ExecutionContext::default();
847        let mut entity_index_error_map: HashMap<&usize, Vec<GraphQLErrorPath>> = HashMap::new();
848        entity_index_error_map.insert(
849            &0,
850            vec![
851                GraphQLErrorPath {
852                    segments: vec![
853                        GraphQLErrorPathSegment::String("a".to_string()),
854                        GraphQLErrorPathSegment::Index(0),
855                    ],
856                },
857                GraphQLErrorPath {
858                    segments: vec![
859                        GraphQLErrorPathSegment::String("b".to_string()),
860                        GraphQLErrorPathSegment::Index(1),
861                    ],
862                },
863            ],
864        );
865        let response_errors = vec![GraphQLError {
866            message: "Error 1".to_string(),
867            locations: None,
868            path: Some(GraphQLErrorPath {
869                segments: vec![
870                    GraphQLErrorPathSegment::String("_entities".to_string()),
871                    GraphQLErrorPathSegment::Index(0),
872                    GraphQLErrorPathSegment::String("field1".to_string()),
873                ],
874            }),
875            extensions: GraphQLErrorExtensions::default(),
876        }];
877        ctx.handle_errors(
878            "subgraph_a",
879            None,
880            Some(response_errors),
881            Some(entity_index_error_map),
882        );
883        assert_eq!(ctx.errors.len(), 2);
884        assert_eq!(ctx.errors[0].message, "Error 1");
885        assert_eq!(
886            ctx.errors[0].path.as_ref().unwrap().segments,
887            vec![
888                GraphQLErrorPathSegment::String("a".to_string()),
889                GraphQLErrorPathSegment::Index(0),
890                GraphQLErrorPathSegment::String("field1".to_string())
891            ]
892        );
893        assert_eq!(ctx.errors[1].message, "Error 1");
894        assert_eq!(
895            ctx.errors[1].path.as_ref().unwrap().segments,
896            vec![
897                GraphQLErrorPathSegment::String("b".to_string()),
898                GraphQLErrorPathSegment::Index(1),
899                GraphQLErrorPathSegment::String("field1".to_string())
900            ]
901        );
902    }
903}