1use std::collections::{BTreeSet, HashMap};
2
3use bytes::{BufMut, Bytes};
4use futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt};
5use hive_router_query_planner::planner::plan_nodes::{
6 ConditionNode, FetchNode, FetchRewrite, FlattenNode, FlattenNodePath, ParallelNode, PlanNode,
7 QueryPlan, SequenceNode,
8};
9use http::HeaderMap;
10use serde::Deserialize;
11use sonic_rs::ValueRef;
12
13use crate::{
14 context::ExecutionContext,
15 execution::{
16 client_request_details::ClientRequestDetails,
17 error::{IntoPlanExecutionError, LazyPlanContext, PlanExecutionError},
18 jwt_forward::JwtAuthForwardingPlan,
19 rewrites::FetchRewriteExt,
20 },
21 executors::{
22 common::{HttpExecutionResponse, SubgraphExecutionRequest},
23 map::SubgraphExecutorMap,
24 },
25 headers::{
26 plan::HeaderRulesPlan,
27 request::modify_subgraph_request_headers,
28 response::{apply_subgraph_response_headers, modify_client_response_headers},
29 },
30 introspection::{
31 resolve::{resolve_introspection, IntrospectionContext},
32 schema::SchemaMetadata,
33 },
34 projection::{
35 plan::FieldProjectionPlan,
36 request::{project_requires, RequestProjectionContext},
37 response::project_by_operation,
38 },
39 response::{
40 graphql_error::{GraphQLError, GraphQLErrorExtensions, GraphQLErrorPath},
41 merge::deep_merge,
42 subgraph_response::SubgraphResponse,
43 value::Value,
44 },
45 utils::{
46 consts::{CLOSE_BRACKET, OPEN_BRACKET},
47 traverse::{traverse_and_callback, traverse_and_callback_mut},
48 },
49};
50
51pub struct QueryPlanExecutionContext<'exec, 'req> {
52 pub query_plan: &'exec QueryPlan,
53 pub projection_plan: &'exec Vec<FieldProjectionPlan>,
54 pub headers_plan: &'exec HeaderRulesPlan,
55 pub variable_values: &'exec Option<HashMap<String, sonic_rs::Value>>,
56 pub extensions: Option<HashMap<String, sonic_rs::Value>>,
57 pub client_request: &'exec ClientRequestDetails<'exec, 'req>,
58 pub introspection_context: &'exec IntrospectionContext<'exec, 'static>,
59 pub operation_type_name: &'exec str,
60 pub executors: &'exec SubgraphExecutorMap,
61 pub jwt_auth_forwarding: &'exec Option<JwtAuthForwardingPlan>,
62 pub initial_errors: Vec<GraphQLError>,
63}
64
65pub struct PlanExecutionOutput {
66 pub body: Vec<u8>,
67 pub headers: HeaderMap,
68}
69
70pub async fn execute_query_plan<'exec, 'req>(
71 ctx: QueryPlanExecutionContext<'exec, 'req>,
72) -> Result<PlanExecutionOutput, PlanExecutionError> {
73 let init_value = if let Some(introspection_query) = ctx.introspection_context.query {
74 resolve_introspection(introspection_query, ctx.introspection_context)
75 } else if ctx.projection_plan.is_empty() {
76 Value::Null
77 } else {
78 Value::Object(Vec::new())
79 };
80
81 let mut exec_ctx = ExecutionContext::new(ctx.query_plan, init_value, ctx.initial_errors);
82 let executor = Executor::new(
83 ctx.variable_values,
84 ctx.executors,
85 ctx.introspection_context.metadata,
86 ctx.client_request,
87 ctx.headers_plan,
88 ctx.jwt_auth_forwarding,
89 ctx.operation_type_name == "Query",
91 );
92
93 if ctx.query_plan.node.is_some() {
94 executor
95 .execute(&mut exec_ctx, ctx.query_plan.node.as_ref())
96 .await?;
97 }
98
99 let mut response_headers = HeaderMap::new();
100 modify_client_response_headers(exec_ctx.response_headers_aggregator, &mut response_headers)
101 .with_plan_context(LazyPlanContext {
102 subgraph_name: || None,
103 affected_path: || None,
104 })?;
105
106 let final_response = &exec_ctx.final_response;
107 let body = project_by_operation(
108 final_response,
109 exec_ctx.errors,
110 &ctx.extensions,
111 ctx.operation_type_name,
112 ctx.projection_plan,
113 ctx.variable_values,
114 exec_ctx.response_storage.estimate_final_response_size(),
115 )
116 .with_plan_context(LazyPlanContext {
117 subgraph_name: || None,
118 affected_path: || None,
119 })?;
120
121 Ok(PlanExecutionOutput {
122 body,
123 headers: response_headers,
124 })
125}
126
127pub struct Executor<'exec, 'req> {
128 variable_values: &'exec Option<HashMap<String, sonic_rs::Value>>,
129 schema_metadata: &'exec SchemaMetadata,
130 executors: &'exec SubgraphExecutorMap,
131 client_request: &'exec ClientRequestDetails<'exec, 'req>,
132 headers_plan: &'exec HeaderRulesPlan,
133 jwt_forwarding_plan: &'exec Option<JwtAuthForwardingPlan>,
134 dedupe_subgraph_requests: bool,
135}
136
137struct ConcurrencyScope<'exec, T> {
138 jobs: FuturesUnordered<BoxFuture<'exec, T>>,
139}
140
141impl<'exec, T> ConcurrencyScope<'exec, T> {
142 fn new() -> Self {
143 Self {
144 jobs: FuturesUnordered::new(),
145 }
146 }
147
148 fn spawn(&mut self, future: BoxFuture<'exec, T>) {
149 self.jobs.push(future);
150 }
151
152 async fn join_all(mut self) -> Vec<T> {
153 let mut results = Vec::with_capacity(self.jobs.len());
154 while let Some(result) = self.jobs.next().await {
155 results.push(result);
156 }
157 results
158 }
159}
160
161struct SubgraphOutput {
162 body: Bytes,
163 headers: HeaderMap,
164}
165
166struct FetchJob {
167 fetch_node_id: i64,
168 subgraph_name: String,
169 response: SubgraphOutput,
170}
171
172struct FlattenFetchJob {
173 flatten_node_path: FlattenNodePath,
174 response: SubgraphOutput,
175 fetch_node_id: i64,
176 subgraph_name: String,
177 representation_hashes: Vec<u64>,
178 representation_hash_to_index: HashMap<u64, usize>,
179}
180
181enum ExecutionJob {
182 Fetch(FetchJob),
183 FlattenFetch(FlattenFetchJob),
184 None,
185}
186
187impl From<ExecutionJob> for SubgraphOutput {
188 fn from(value: ExecutionJob) -> Self {
189 match value {
190 ExecutionJob::Fetch(j) => Self {
191 body: j.response.body,
192 headers: j.response.headers,
193 },
194 ExecutionJob::FlattenFetch(j) => Self {
195 body: j.response.body,
196 headers: j.response.headers,
197 },
198 ExecutionJob::None => Self {
199 body: Bytes::new(),
200 headers: HeaderMap::new(),
201 },
202 }
203 }
204}
205
206impl From<HttpExecutionResponse> for SubgraphOutput {
207 fn from(res: HttpExecutionResponse) -> Self {
208 Self {
209 body: res.body,
210 headers: res.headers,
211 }
212 }
213}
214
215struct PreparedFlattenData {
216 representations: Vec<u8>,
217 representation_hashes: Vec<u64>,
218 representation_hash_to_index: HashMap<u64, usize>,
219}
220
221impl<'exec, 'req> Executor<'exec, 'req> {
222 pub fn new(
223 variable_values: &'exec Option<HashMap<String, sonic_rs::Value>>,
224 executors: &'exec SubgraphExecutorMap,
225 schema_metadata: &'exec SchemaMetadata,
226 client_request: &'exec ClientRequestDetails<'exec, 'req>,
227 headers_plan: &'exec HeaderRulesPlan,
228 jwt_forwarding_plan: &'exec Option<JwtAuthForwardingPlan>,
229 dedupe_subgraph_requests: bool,
230 ) -> Self {
231 Executor {
232 variable_values,
233 executors,
234 schema_metadata,
235 client_request,
236 headers_plan,
237 dedupe_subgraph_requests,
238 jwt_forwarding_plan,
239 }
240 }
241
242 pub async fn execute(
243 &self,
244 ctx: &mut ExecutionContext<'exec>,
245 plan: Option<&PlanNode>,
246 ) -> Result<(), PlanExecutionError> {
247 match plan {
248 Some(PlanNode::Fetch(node)) => self.execute_fetch_wave(ctx, node).await,
249 Some(PlanNode::Parallel(node)) => self.execute_parallel_wave(ctx, node).await,
250 Some(PlanNode::Sequence(node)) => self.execute_sequence_wave(ctx, node).await,
251 Some(_) => Ok(()),
254 None => Ok(()),
256 }
257 }
258
259 async fn execute_fetch_wave(
260 &self,
261 ctx: &mut ExecutionContext<'exec>,
262 node: &FetchNode,
263 ) -> Result<(), PlanExecutionError> {
264 match self.execute_fetch_node(node, None).await {
265 Ok(result) => self.process_job_result(ctx, result),
266 Err(err) => {
267 self.log_error(&err);
268 ctx.errors.push(err.into());
269 Ok(())
270 }
271 }
272 }
273
274 async fn execute_sequence_wave(
275 &self,
276 ctx: &mut ExecutionContext<'exec>,
277 node: &SequenceNode,
278 ) -> Result<(), PlanExecutionError> {
279 for child in &node.nodes {
280 Box::pin(self.execute_plan_node(ctx, child)).await?;
281 }
282
283 Ok(())
284 }
285
286 async fn execute_parallel_wave(
287 &self,
288 ctx: &mut ExecutionContext<'exec>,
289 node: &ParallelNode,
290 ) -> Result<(), PlanExecutionError> {
291 let mut scope = ConcurrencyScope::new();
292
293 for child in &node.nodes {
294 let job_future = self.prepare_job_future(child, &ctx.final_response);
295 scope.spawn(job_future);
296 }
297
298 let results = scope.join_all().await;
299
300 for result in results {
301 match result {
302 Ok(job) => {
303 self.process_job_result(ctx, job)?;
304 }
305 Err(err) => {
306 self.log_error(&err);
307 ctx.errors.push(err.into())
308 }
309 }
310 }
311
312 Ok(())
313 }
314
315 async fn execute_plan_node(
316 &self,
317 ctx: &mut ExecutionContext<'exec>,
318 node: &PlanNode,
319 ) -> Result<(), PlanExecutionError> {
320 match node {
321 PlanNode::Fetch(fetch_node) => match self.execute_fetch_node(fetch_node, None).await {
322 Ok(job) => {
323 self.process_job_result(ctx, job)?;
324 }
325 Err(err) => {
326 self.log_error(&err);
327 ctx.errors.push(err.into());
328 }
329 },
330 PlanNode::Parallel(parallel_node) => {
331 self.execute_parallel_wave(ctx, parallel_node).await?;
332 }
333 PlanNode::Flatten(flatten_node) => {
334 match self.prepare_flatten_data(&ctx.final_response, flatten_node) {
335 Ok(Some(p)) => {
336 match self
337 .execute_flatten_fetch_node(
338 flatten_node,
339 Some(p.representations),
340 Some(p.representation_hashes),
341 Some(p.representation_hash_to_index),
342 )
343 .await
344 {
345 Ok(job) => {
346 self.process_job_result(ctx, job)?;
347 }
348 Err(err) => {
349 self.log_error(&err);
350 ctx.errors.push(err.into());
351 }
352 }
353 }
354 Ok(None) => { }
355 Err(err) => {
356 self.log_error(&err);
357 ctx.errors.push(err.into());
358 }
359 }
360 }
361 PlanNode::Sequence(sequence_node) => {
362 self.execute_sequence_wave(ctx, sequence_node).await?;
363 }
364 PlanNode::Condition(condition_node) => {
365 if let Some(node) =
366 condition_node_by_variables(condition_node, self.variable_values)
367 {
368 Box::pin(self.execute_plan_node(ctx, node)).await?;
369 }
370 }
371 _ => {}
373 }
374
375 Ok(())
376 }
377
378 fn prepare_job_future<'wave>(
379 &'wave self,
380 node: &'wave PlanNode,
381 final_response: &Value<'exec>,
382 ) -> BoxFuture<'wave, Result<ExecutionJob, PlanExecutionError>> {
383 match node {
384 PlanNode::Fetch(fetch_node) => Box::pin(self.execute_fetch_node(fetch_node, None)),
385 PlanNode::Flatten(flatten_node) => {
386 match self.prepare_flatten_data(final_response, flatten_node) {
387 Ok(Some(p)) => Box::pin(self.execute_flatten_fetch_node(
388 flatten_node,
389 Some(p.representations),
390 Some(p.representation_hashes),
391 Some(p.representation_hash_to_index),
392 )),
393 Ok(None) => Box::pin(async { Ok(ExecutionJob::None) }),
394 Err(e) => Box::pin(async move { Err(e) }),
395 }
396 }
397 PlanNode::Condition(node) => {
398 match condition_node_by_variables(node, self.variable_values) {
399 Some(node) => Box::pin(self.prepare_job_future(node, final_response)), None => Box::pin(async { Ok(ExecutionJob::None) }),
401 }
402 }
403 _ => Box::pin(async { Ok(ExecutionJob::None) }),
405 }
406 }
407
408 fn process_subgraph_response(
409 &self,
410 subgraph_name: &str,
411 ctx: &mut ExecutionContext<'exec>,
412 response_bytes: Bytes,
413 fetch_node_id: i64,
414 ) -> Option<(SubgraphResponse<'exec>, Option<&'exec Vec<FetchRewrite>>)> {
415 let idx = ctx.response_storage.add_response(response_bytes);
416 let bytes: &'exec [u8] =
423 unsafe { std::mem::transmute(ctx.response_storage.get_bytes(idx)) };
424
425 let output_rewrites: Option<&'exec Vec<FetchRewrite>> =
429 unsafe { std::mem::transmute(ctx.output_rewrites.get(fetch_node_id)) };
430
431 let mut deserializer = sonic_rs::Deserializer::from_slice(bytes);
432 let response = match SubgraphResponse::deserialize(&mut deserializer) {
433 Ok(response) => response,
434 Err(e) => {
435 let message = format!("Failed to deserialize subgraph response: {}", e);
436 let extensions = GraphQLErrorExtensions::new_from_code_and_service_name(
437 "SUBGRAPH_RESPONSE_DESERIALIZATION_FAILED",
438 subgraph_name,
439 );
440 let error = GraphQLError::from_message_and_extensions(message, extensions);
441
442 ctx.errors.push(error);
443 return None;
444 }
445 };
446
447 Some((response, output_rewrites))
448 }
449
450 fn process_job_result(
451 &self,
452 ctx: &mut ExecutionContext<'exec>,
453 job: ExecutionJob,
454 ) -> Result<(), PlanExecutionError> {
455 let _: () = match job {
456 ExecutionJob::Fetch(job) => {
457 apply_subgraph_response_headers(
458 self.headers_plan,
459 &job.subgraph_name,
460 &job.response.headers,
461 self.client_request,
462 &mut ctx.response_headers_aggregator,
463 )
464 .with_plan_context(LazyPlanContext {
465 subgraph_name: || Some(job.subgraph_name.clone()),
466 affected_path: || None,
467 })?;
468
469 if let Some((mut response, output_rewrites)) = self.process_subgraph_response(
470 job.subgraph_name.as_ref(),
471 ctx,
472 job.response.body,
473 job.fetch_node_id,
474 ) {
475 ctx.handle_errors(&job.subgraph_name, None, response.errors, None);
476 if let Some(output_rewrites) = output_rewrites {
477 for output_rewrite in output_rewrites {
478 output_rewrite
479 .rewrite(&self.schema_metadata.possible_types, &mut response.data);
480 }
481 }
482
483 deep_merge(&mut ctx.final_response, response.data);
484 }
485 }
486 ExecutionJob::FlattenFetch(job) => {
487 apply_subgraph_response_headers(
488 self.headers_plan,
489 &job.subgraph_name,
490 &job.response.headers,
491 self.client_request,
492 &mut ctx.response_headers_aggregator,
493 )
494 .with_plan_context(LazyPlanContext {
495 subgraph_name: || Some(job.subgraph_name.clone()),
496 affected_path: || Some(job.flatten_node_path.to_string()),
497 })?;
498
499 if let Some((mut response, output_rewrites)) = self.process_subgraph_response(
500 &job.subgraph_name,
501 ctx,
502 job.response.body,
503 job.fetch_node_id,
504 ) {
505 if let Some(mut entities) = response.data.take_entities() {
506 if let Some(output_rewrites) = output_rewrites {
507 for output_rewrite in output_rewrites {
508 for entity in &mut entities {
509 output_rewrite
510 .rewrite(&self.schema_metadata.possible_types, entity);
511 }
512 }
513 }
514
515 let mut index = 0;
516 let normalized_path = job.flatten_node_path.as_slice();
517 let initial_error_path = response
519 .errors
520 .as_ref()
521 .map(|_| GraphQLErrorPath::with_capacity(normalized_path.len() + 2));
522 let mut entity_index_error_map = response
523 .errors
524 .as_ref()
525 .map(|_| HashMap::with_capacity(entities.len()));
526 traverse_and_callback_mut(
527 &mut ctx.final_response,
528 normalized_path,
529 self.schema_metadata,
530 initial_error_path,
531 &mut |target, error_path| {
532 let hash = job.representation_hashes[index];
533 if let Some(entity_index) =
534 job.representation_hash_to_index.get(&hash)
535 {
536 if let (Some(error_path), Some(entity_index_error_map)) =
537 (error_path, entity_index_error_map.as_mut())
538 {
539 let error_paths = entity_index_error_map
540 .entry(entity_index)
541 .or_insert_with(Vec::new);
542 error_paths.push(error_path);
543 }
544 if let Some(entity) = entities.get(*entity_index) {
545 let new_val: Value<'_> =
549 unsafe { std::mem::transmute(entity.clone()) };
550 deep_merge(target, new_val);
551 }
552 }
553 index += 1;
554 },
555 );
556 ctx.handle_errors(
557 &job.subgraph_name,
558 Some(job.flatten_node_path.to_string()),
559 response.errors,
560 entity_index_error_map,
561 );
562 } else if let Some(errors) = response.errors {
563 let affected_path = job.flatten_node_path.to_string();
566 ctx.errors.extend(errors.into_iter().map(|e| {
567 e.add_subgraph_name(&job.subgraph_name)
568 .add_affected_path(affected_path.clone())
569 }));
570 }
571 }
572 }
573 ExecutionJob::None => {
574 }
576 };
577 Ok(())
578 }
579
580 fn prepare_flatten_data(
581 &self,
582 final_response: &Value<'exec>,
583 flatten_node: &FlattenNode,
584 ) -> Result<Option<PreparedFlattenData>, PlanExecutionError> {
585 let fetch_node = match flatten_node.node.as_ref() {
586 PlanNode::Fetch(fetch_node) => fetch_node,
587 _ => return Ok(None),
588 };
589 let requires_nodes = match fetch_node.requires.as_ref() {
590 Some(nodes) => nodes,
591 None => return Ok(None),
592 };
593
594 let mut index = 0;
595 let normalized_path = flatten_node.path.as_slice();
596 let mut filtered_representations = Vec::new();
597 filtered_representations.put(OPEN_BRACKET);
598 let proj_ctx = RequestProjectionContext::new(&self.schema_metadata.possible_types);
599 let mut representation_hashes: Vec<u64> = Vec::new();
600 let mut filtered_representations_hashes: HashMap<u64, usize> = HashMap::new();
601 let arena = bumpalo::Bump::new();
602
603 traverse_and_callback(
604 final_response,
605 normalized_path,
606 self.schema_metadata,
607 &mut |entity| {
608 let hash = entity.to_hash(&requires_nodes.items, proj_ctx.possible_types);
609
610 if !entity.is_null() {
611 representation_hashes.push(hash);
612 }
613
614 if filtered_representations_hashes.contains_key(&hash) {
615 return Ok::<(), PlanExecutionError>(());
616 }
617
618 let entity = if let Some(input_rewrites) = &fetch_node.input_rewrites {
619 let new_entity = arena.alloc(entity.clone());
620 for input_rewrite in input_rewrites {
621 input_rewrite.rewrite(&self.schema_metadata.possible_types, new_entity);
622 }
623 new_entity
624 } else {
625 entity
626 };
627
628 let is_projected = project_requires(
629 &proj_ctx,
630 &requires_nodes.items,
631 entity,
632 &mut filtered_representations,
633 filtered_representations_hashes.is_empty(),
634 None,
635 )
636 .with_plan_context(LazyPlanContext {
637 subgraph_name: || Some(fetch_node.service_name.clone()),
638 affected_path: || Some(flatten_node.path.to_string()),
639 })?;
640
641 if is_projected {
642 filtered_representations_hashes.insert(hash, index);
643 }
644
645 index += 1;
646
647 Ok(())
648 },
649 )?;
650 filtered_representations.put(CLOSE_BRACKET);
651
652 if filtered_representations_hashes.is_empty() {
653 return Ok(None);
654 }
655
656 Ok(Some(PreparedFlattenData {
657 representations: filtered_representations,
658 representation_hashes,
659 representation_hash_to_index: filtered_representations_hashes,
660 }))
661 }
662
663 async fn execute_flatten_fetch_node(
664 &self,
665 node: &FlattenNode,
666 representations: Option<Vec<u8>>,
667 representation_hashes: Option<Vec<u64>>,
668 filtered_representations_hashes: Option<HashMap<u64, usize>>,
669 ) -> Result<ExecutionJob, PlanExecutionError> {
670 Ok(match node.node.as_ref() {
671 PlanNode::Fetch(fetch_node) => ExecutionJob::FlattenFetch(FlattenFetchJob {
672 flatten_node_path: node.path.clone(),
673 response: self
674 .execute_fetch_node(fetch_node, representations)
675 .await?
676 .into(),
677 fetch_node_id: fetch_node.id,
678 subgraph_name: fetch_node.service_name.clone(),
679 representation_hashes: representation_hashes.unwrap_or_default(),
680 representation_hash_to_index: filtered_representations_hashes.unwrap_or_default(),
681 }),
682 _ => ExecutionJob::None,
683 })
684 }
685
686 async fn execute_fetch_node(
687 &self,
688 node: &FetchNode,
689 representations: Option<Vec<u8>>,
690 ) -> Result<ExecutionJob, PlanExecutionError> {
691 let mut headers_map = HeaderMap::new();
693 modify_subgraph_request_headers(
694 self.headers_plan,
695 &node.service_name,
696 self.client_request,
697 &mut headers_map,
698 )
699 .with_plan_context(LazyPlanContext {
700 subgraph_name: || Some(node.service_name.clone()),
701 affected_path: || None,
702 })?;
703 let variable_refs =
704 select_fetch_variables(self.variable_values, node.variable_usages.as_ref());
705
706 let mut subgraph_request = SubgraphExecutionRequest {
707 query: node.operation.document_str.as_str(),
708 dedupe: self.dedupe_subgraph_requests,
709 operation_name: node.operation_name.as_deref(),
710 variables: variable_refs,
711 representations,
712 headers: headers_map,
713 extensions: None,
714 };
715
716 if let Some(jwt_forwarding_plan) = &self.jwt_forwarding_plan {
717 subgraph_request.add_request_extensions_field(
718 jwt_forwarding_plan.extension_field_name.clone(),
719 jwt_forwarding_plan.extension_field_value.clone(),
720 );
721 }
722
723 Ok(ExecutionJob::Fetch(FetchJob {
724 fetch_node_id: node.id,
725 subgraph_name: node.service_name.clone(),
726 response: self
727 .executors
728 .execute(&node.service_name, subgraph_request, self.client_request)
729 .await
730 .into(),
731 }))
732 }
733
734 fn log_error(&self, error: &PlanExecutionError) {
735 tracing::error!(
736 subgraph_name = error.subgraph_name(),
737 error = error as &dyn std::error::Error,
738 "Plan execution error"
739 );
740 }
741}
742
743fn condition_node_by_variables<'a>(
744 condition_node: &'a ConditionNode,
745 variable_values: &'a Option<HashMap<String, sonic_rs::Value>>,
746) -> Option<&'a PlanNode> {
747 let vars = variable_values.as_ref()?;
748 let value = vars.get(&condition_node.condition)?;
749 let condition_met = matches!(value.as_ref(), ValueRef::Bool(true));
750
751 if condition_met {
752 condition_node.if_clause.as_deref()
753 } else {
754 condition_node.else_clause.as_deref()
755 }
756}
757
758fn select_fetch_variables<'a>(
759 variable_values: &'a Option<HashMap<String, sonic_rs::Value>>,
760 variable_usages: Option<&BTreeSet<String>>,
761) -> Option<HashMap<&'a str, &'a sonic_rs::Value>> {
762 let values = variable_values.as_ref()?;
763
764 variable_usages.map(|variable_usages| {
765 variable_usages
766 .iter()
767 .filter_map(|var_name| {
768 values
769 .get_key_value(var_name.as_str())
770 .map(|(key, value)| (key.as_str(), value))
771 })
772 .collect()
773 })
774}
775
776#[cfg(test)]
777mod tests {
778 use crate::{
779 context::ExecutionContext,
780 response::graphql_error::{GraphQLErrorExtensions, GraphQLErrorPath},
781 };
782
783 use super::select_fetch_variables;
784 use sonic_rs::Value;
785 use std::collections::{BTreeSet, HashMap};
786
787 fn value_from_number(n: i32) -> Value {
788 sonic_rs::from_str(&n.to_string()).unwrap()
789 }
790
791 #[test]
792 fn select_fetch_variables_only_used_variables() {
793 let mut variable_values_map = HashMap::new();
794 variable_values_map.insert("used".to_string(), value_from_number(1));
795 variable_values_map.insert("unused".to_string(), value_from_number(2));
796 let variable_values = Some(variable_values_map);
797
798 let mut usages = BTreeSet::new();
799 usages.insert("used".to_string());
800
801 let selected = select_fetch_variables(&variable_values, Some(&usages)).unwrap();
802
803 assert_eq!(selected.len(), 1);
804 assert!(selected.contains_key("used"));
805 assert!(!selected.contains_key("unused"));
806 }
807
808 #[test]
809 fn select_fetch_variables_ignores_missing_usage_entries() {
810 let mut variable_values_map = HashMap::new();
811 variable_values_map.insert("present".to_string(), value_from_number(3));
812 let variable_values = Some(variable_values_map);
813
814 let mut usages = BTreeSet::new();
815 usages.insert("present".to_string());
816 usages.insert("missing".to_string());
817
818 let selected = select_fetch_variables(&variable_values, Some(&usages)).unwrap();
819
820 assert_eq!(selected.len(), 1);
821 assert!(selected.contains_key("present"));
822 assert!(!selected.contains_key("missing"));
823 }
824
825 #[test]
826 fn select_fetch_variables_for_no_usage_entries() {
827 let mut variable_values_map = HashMap::new();
828 variable_values_map.insert("unused_1".to_string(), value_from_number(1));
829 variable_values_map.insert("unused_2".to_string(), value_from_number(2));
830
831 let variable_values = Some(variable_values_map);
832
833 let selected = select_fetch_variables(&variable_values, None);
834
835 assert!(selected.is_none());
836 }
837 #[test]
838 fn normalize_entity_errors_correctly() {
844 use crate::response::graphql_error::{GraphQLError, GraphQLErrorPathSegment};
845 use std::collections::HashMap;
846 let mut ctx = ExecutionContext::default();
847 let mut entity_index_error_map: HashMap<&usize, Vec<GraphQLErrorPath>> = HashMap::new();
848 entity_index_error_map.insert(
849 &0,
850 vec![
851 GraphQLErrorPath {
852 segments: vec![
853 GraphQLErrorPathSegment::String("a".to_string()),
854 GraphQLErrorPathSegment::Index(0),
855 ],
856 },
857 GraphQLErrorPath {
858 segments: vec![
859 GraphQLErrorPathSegment::String("b".to_string()),
860 GraphQLErrorPathSegment::Index(1),
861 ],
862 },
863 ],
864 );
865 let response_errors = vec![GraphQLError {
866 message: "Error 1".to_string(),
867 locations: None,
868 path: Some(GraphQLErrorPath {
869 segments: vec![
870 GraphQLErrorPathSegment::String("_entities".to_string()),
871 GraphQLErrorPathSegment::Index(0),
872 GraphQLErrorPathSegment::String("field1".to_string()),
873 ],
874 }),
875 extensions: GraphQLErrorExtensions::default(),
876 }];
877 ctx.handle_errors(
878 "subgraph_a",
879 None,
880 Some(response_errors),
881 Some(entity_index_error_map),
882 );
883 assert_eq!(ctx.errors.len(), 2);
884 assert_eq!(ctx.errors[0].message, "Error 1");
885 assert_eq!(
886 ctx.errors[0].path.as_ref().unwrap().segments,
887 vec![
888 GraphQLErrorPathSegment::String("a".to_string()),
889 GraphQLErrorPathSegment::Index(0),
890 GraphQLErrorPathSegment::String("field1".to_string())
891 ]
892 );
893 assert_eq!(ctx.errors[1].message, "Error 1");
894 assert_eq!(
895 ctx.errors[1].path.as_ref().unwrap().segments,
896 vec![
897 GraphQLErrorPathSegment::String("b".to_string()),
898 GraphQLErrorPathSegment::Index(1),
899 GraphQLErrorPathSegment::String("field1".to_string())
900 ]
901 );
902 }
903}