1use std::collections::{BTreeSet, HashMap};
2
3use bytes::{BufMut, Bytes};
4use futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt};
5use hive_router_query_planner::planner::plan_nodes::{
6 ConditionNode, FetchNode, FetchRewrite, FlattenNode, FlattenNodePath, ParallelNode, PlanNode,
7 QueryPlan, SequenceNode,
8};
9use http::HeaderMap;
10use serde::Deserialize;
11use sonic_rs::ValueRef;
12
13use crate::{
14 context::ExecutionContext,
15 execution::{
16 client_request_details::ClientRequestDetails,
17 error::{IntoPlanExecutionError, LazyPlanContext, PlanExecutionError},
18 jwt_forward::JwtAuthForwardingPlan,
19 rewrites::FetchRewriteExt,
20 },
21 executors::{
22 common::{HttpExecutionResponse, SubgraphExecutionRequest},
23 map::SubgraphExecutorMap,
24 },
25 headers::{
26 plan::HeaderRulesPlan,
27 request::modify_subgraph_request_headers,
28 response::{apply_subgraph_response_headers, modify_client_response_headers},
29 },
30 introspection::{
31 resolve::{resolve_introspection, IntrospectionContext},
32 schema::SchemaMetadata,
33 },
34 projection::{
35 plan::FieldProjectionPlan,
36 request::{project_requires, RequestProjectionContext},
37 response::project_by_operation,
38 },
39 response::{
40 graphql_error::{GraphQLError, GraphQLErrorExtensions, GraphQLErrorPath},
41 merge::deep_merge,
42 subgraph_response::SubgraphResponse,
43 value::Value,
44 },
45 utils::{
46 consts::{CLOSE_BRACKET, OPEN_BRACKET},
47 traverse::{traverse_and_callback, traverse_and_callback_mut},
48 },
49};
50
51pub struct QueryPlanExecutionContext<'exec, 'req> {
52 pub query_plan: &'exec QueryPlan,
53 pub projection_plan: &'exec Vec<FieldProjectionPlan>,
54 pub headers_plan: &'exec HeaderRulesPlan,
55 pub variable_values: &'exec Option<HashMap<String, sonic_rs::Value>>,
56 pub extensions: Option<HashMap<String, sonic_rs::Value>>,
57 pub client_request: &'exec ClientRequestDetails<'exec, 'req>,
58 pub introspection_context: &'exec IntrospectionContext<'exec, 'static>,
59 pub operation_type_name: &'exec str,
60 pub executors: &'exec SubgraphExecutorMap,
61 pub jwt_auth_forwarding: &'exec Option<JwtAuthForwardingPlan>,
62 pub initial_errors: Vec<GraphQLError>,
63}
64
65pub struct PlanExecutionOutput {
66 pub body: Vec<u8>,
67 pub headers: HeaderMap,
68 pub error_count: usize,
69}
70
71pub async fn execute_query_plan<'exec, 'req>(
72 ctx: QueryPlanExecutionContext<'exec, 'req>,
73) -> Result<PlanExecutionOutput, PlanExecutionError> {
74 let init_value = if let Some(introspection_query) = ctx.introspection_context.query {
75 resolve_introspection(introspection_query, ctx.introspection_context)
76 } else if ctx.projection_plan.is_empty() {
77 Value::Null
78 } else {
79 Value::Object(Vec::new())
80 };
81
82 let mut exec_ctx = ExecutionContext::new(ctx.query_plan, init_value, ctx.initial_errors);
83 let executor = Executor::new(
84 ctx.variable_values,
85 ctx.executors,
86 ctx.introspection_context.metadata,
87 ctx.client_request,
88 ctx.headers_plan,
89 ctx.jwt_auth_forwarding,
90 ctx.operation_type_name == "Query",
92 );
93
94 if ctx.query_plan.node.is_some() {
95 executor
96 .execute(&mut exec_ctx, ctx.query_plan.node.as_ref())
97 .await?;
98 }
99
100 let mut response_headers = HeaderMap::new();
101 modify_client_response_headers(exec_ctx.response_headers_aggregator, &mut response_headers)
102 .with_plan_context(LazyPlanContext {
103 subgraph_name: || None,
104 affected_path: || None,
105 })?;
106
107 let final_response = &exec_ctx.final_response;
108 let error_count = exec_ctx.errors.len(); let body = project_by_operation(
110 final_response,
111 exec_ctx.errors,
112 &ctx.extensions,
113 ctx.operation_type_name,
114 ctx.projection_plan,
115 ctx.variable_values,
116 exec_ctx.response_storage.estimate_final_response_size(),
117 ctx.introspection_context.metadata,
118 )
119 .with_plan_context(LazyPlanContext {
120 subgraph_name: || None,
121 affected_path: || None,
122 })?;
123
124 Ok(PlanExecutionOutput {
125 body,
126 headers: response_headers,
127 error_count,
128 })
129}
130
131pub struct Executor<'exec, 'req> {
132 variable_values: &'exec Option<HashMap<String, sonic_rs::Value>>,
133 schema_metadata: &'exec SchemaMetadata,
134 executors: &'exec SubgraphExecutorMap,
135 client_request: &'exec ClientRequestDetails<'exec, 'req>,
136 headers_plan: &'exec HeaderRulesPlan,
137 jwt_forwarding_plan: &'exec Option<JwtAuthForwardingPlan>,
138 dedupe_subgraph_requests: bool,
139}
140
141struct ConcurrencyScope<'exec, T> {
142 jobs: FuturesUnordered<BoxFuture<'exec, T>>,
143}
144
145impl<'exec, T> ConcurrencyScope<'exec, T> {
146 fn new() -> Self {
147 Self {
148 jobs: FuturesUnordered::new(),
149 }
150 }
151
152 fn spawn(&mut self, future: BoxFuture<'exec, T>) {
153 self.jobs.push(future);
154 }
155
156 async fn join_all(mut self) -> Vec<T> {
157 let mut results = Vec::with_capacity(self.jobs.len());
158 while let Some(result) = self.jobs.next().await {
159 results.push(result);
160 }
161 results
162 }
163}
164
165struct SubgraphOutput {
166 body: Bytes,
167 headers: HeaderMap,
168}
169
170struct FetchJob {
171 fetch_node_id: i64,
172 subgraph_name: String,
173 response: SubgraphOutput,
174}
175
176struct FlattenFetchJob {
177 flatten_node_path: FlattenNodePath,
178 response: SubgraphOutput,
179 fetch_node_id: i64,
180 subgraph_name: String,
181 representation_hashes: Vec<u64>,
182 representation_hash_to_index: HashMap<u64, usize>,
183}
184
185enum ExecutionJob {
186 Fetch(FetchJob),
187 FlattenFetch(FlattenFetchJob),
188 None,
189}
190
191impl From<ExecutionJob> for SubgraphOutput {
192 fn from(value: ExecutionJob) -> Self {
193 match value {
194 ExecutionJob::Fetch(j) => Self {
195 body: j.response.body,
196 headers: j.response.headers,
197 },
198 ExecutionJob::FlattenFetch(j) => Self {
199 body: j.response.body,
200 headers: j.response.headers,
201 },
202 ExecutionJob::None => Self {
203 body: Bytes::new(),
204 headers: HeaderMap::new(),
205 },
206 }
207 }
208}
209
210impl From<HttpExecutionResponse> for SubgraphOutput {
211 fn from(res: HttpExecutionResponse) -> Self {
212 Self {
213 body: res.body,
214 headers: res.headers,
215 }
216 }
217}
218
219struct PreparedFlattenData {
220 representations: Vec<u8>,
221 representation_hashes: Vec<u64>,
222 representation_hash_to_index: HashMap<u64, usize>,
223}
224
225impl<'exec, 'req> Executor<'exec, 'req> {
226 pub fn new(
227 variable_values: &'exec Option<HashMap<String, sonic_rs::Value>>,
228 executors: &'exec SubgraphExecutorMap,
229 schema_metadata: &'exec SchemaMetadata,
230 client_request: &'exec ClientRequestDetails<'exec, 'req>,
231 headers_plan: &'exec HeaderRulesPlan,
232 jwt_forwarding_plan: &'exec Option<JwtAuthForwardingPlan>,
233 dedupe_subgraph_requests: bool,
234 ) -> Self {
235 Executor {
236 variable_values,
237 executors,
238 schema_metadata,
239 client_request,
240 headers_plan,
241 dedupe_subgraph_requests,
242 jwt_forwarding_plan,
243 }
244 }
245
246 pub async fn execute(
247 &self,
248 ctx: &mut ExecutionContext<'exec>,
249 plan: Option<&PlanNode>,
250 ) -> Result<(), PlanExecutionError> {
251 match plan {
252 Some(PlanNode::Fetch(node)) => self.execute_fetch_wave(ctx, node).await,
253 Some(PlanNode::Parallel(node)) => self.execute_parallel_wave(ctx, node).await,
254 Some(PlanNode::Sequence(node)) => self.execute_sequence_wave(ctx, node).await,
255 Some(_) => Ok(()),
258 None => Ok(()),
260 }
261 }
262
263 async fn execute_fetch_wave(
264 &self,
265 ctx: &mut ExecutionContext<'exec>,
266 node: &FetchNode,
267 ) -> Result<(), PlanExecutionError> {
268 match self.execute_fetch_node(node, None).await {
269 Ok(result) => self.process_job_result(ctx, result),
270 Err(err) => {
271 self.log_error(&err);
272 ctx.errors.push(err.into());
273 Ok(())
274 }
275 }
276 }
277
278 async fn execute_sequence_wave(
279 &self,
280 ctx: &mut ExecutionContext<'exec>,
281 node: &SequenceNode,
282 ) -> Result<(), PlanExecutionError> {
283 for child in &node.nodes {
284 Box::pin(self.execute_plan_node(ctx, child)).await?;
285 }
286
287 Ok(())
288 }
289
290 async fn execute_parallel_wave(
291 &self,
292 ctx: &mut ExecutionContext<'exec>,
293 node: &ParallelNode,
294 ) -> Result<(), PlanExecutionError> {
295 let mut scope = ConcurrencyScope::new();
296
297 for child in &node.nodes {
298 let job_future = self.prepare_job_future(child, &ctx.final_response);
299 scope.spawn(job_future);
300 }
301
302 let results = scope.join_all().await;
303
304 for result in results {
305 match result {
306 Ok(job) => {
307 self.process_job_result(ctx, job)?;
308 }
309 Err(err) => {
310 self.log_error(&err);
311 ctx.errors.push(err.into())
312 }
313 }
314 }
315
316 Ok(())
317 }
318
319 async fn execute_plan_node(
320 &self,
321 ctx: &mut ExecutionContext<'exec>,
322 node: &PlanNode,
323 ) -> Result<(), PlanExecutionError> {
324 match node {
325 PlanNode::Fetch(fetch_node) => match self.execute_fetch_node(fetch_node, None).await {
326 Ok(job) => {
327 self.process_job_result(ctx, job)?;
328 }
329 Err(err) => {
330 self.log_error(&err);
331 ctx.errors.push(err.into());
332 }
333 },
334 PlanNode::Parallel(parallel_node) => {
335 self.execute_parallel_wave(ctx, parallel_node).await?;
336 }
337 PlanNode::Flatten(flatten_node) => {
338 match self.prepare_flatten_data(&ctx.final_response, flatten_node) {
339 Ok(Some(p)) => {
340 match self
341 .execute_flatten_fetch_node(
342 flatten_node,
343 Some(p.representations),
344 Some(p.representation_hashes),
345 Some(p.representation_hash_to_index),
346 )
347 .await
348 {
349 Ok(job) => {
350 self.process_job_result(ctx, job)?;
351 }
352 Err(err) => {
353 self.log_error(&err);
354 ctx.errors.push(err.into());
355 }
356 }
357 }
358 Ok(None) => { }
359 Err(err) => {
360 self.log_error(&err);
361 ctx.errors.push(err.into());
362 }
363 }
364 }
365 PlanNode::Sequence(sequence_node) => {
366 self.execute_sequence_wave(ctx, sequence_node).await?;
367 }
368 PlanNode::Condition(condition_node) => {
369 if let Some(node) =
370 condition_node_by_variables(condition_node, self.variable_values)
371 {
372 Box::pin(self.execute_plan_node(ctx, node)).await?;
373 }
374 }
375 _ => {}
377 }
378
379 Ok(())
380 }
381
382 fn prepare_job_future<'wave>(
383 &'wave self,
384 node: &'wave PlanNode,
385 final_response: &Value<'exec>,
386 ) -> BoxFuture<'wave, Result<ExecutionJob, PlanExecutionError>> {
387 match node {
388 PlanNode::Fetch(fetch_node) => Box::pin(self.execute_fetch_node(fetch_node, None)),
389 PlanNode::Flatten(flatten_node) => {
390 match self.prepare_flatten_data(final_response, flatten_node) {
391 Ok(Some(p)) => Box::pin(self.execute_flatten_fetch_node(
392 flatten_node,
393 Some(p.representations),
394 Some(p.representation_hashes),
395 Some(p.representation_hash_to_index),
396 )),
397 Ok(None) => Box::pin(async { Ok(ExecutionJob::None) }),
398 Err(e) => Box::pin(async move { Err(e) }),
399 }
400 }
401 PlanNode::Condition(node) => {
402 match condition_node_by_variables(node, self.variable_values) {
403 Some(node) => Box::pin(self.prepare_job_future(node, final_response)), None => Box::pin(async { Ok(ExecutionJob::None) }),
405 }
406 }
407 _ => Box::pin(async { Ok(ExecutionJob::None) }),
409 }
410 }
411
412 fn process_subgraph_response(
413 &self,
414 subgraph_name: &str,
415 ctx: &mut ExecutionContext<'exec>,
416 response_bytes: Bytes,
417 fetch_node_id: i64,
418 ) -> Option<(SubgraphResponse<'exec>, Option<&'exec Vec<FetchRewrite>>)> {
419 let idx = ctx.response_storage.add_response(response_bytes);
420 let bytes: &'exec [u8] =
427 unsafe { std::mem::transmute(ctx.response_storage.get_bytes(idx)) };
428
429 let output_rewrites: Option<&'exec Vec<FetchRewrite>> =
433 unsafe { std::mem::transmute(ctx.output_rewrites.get(fetch_node_id)) };
434
435 let mut deserializer = sonic_rs::Deserializer::from_slice(bytes);
436 let response = match SubgraphResponse::deserialize(&mut deserializer) {
437 Ok(response) => response,
438 Err(e) => {
439 let message = format!("Failed to deserialize subgraph response: {}", e);
440 let extensions = GraphQLErrorExtensions::new_from_code_and_service_name(
441 "SUBGRAPH_RESPONSE_DESERIALIZATION_FAILED",
442 subgraph_name,
443 );
444 let error = GraphQLError::from_message_and_extensions(message, extensions);
445
446 ctx.errors.push(error);
447 return None;
448 }
449 };
450
451 Some((response, output_rewrites))
452 }
453
454 fn process_job_result(
455 &self,
456 ctx: &mut ExecutionContext<'exec>,
457 job: ExecutionJob,
458 ) -> Result<(), PlanExecutionError> {
459 let _: () = match job {
460 ExecutionJob::Fetch(job) => {
461 apply_subgraph_response_headers(
462 self.headers_plan,
463 &job.subgraph_name,
464 &job.response.headers,
465 self.client_request,
466 &mut ctx.response_headers_aggregator,
467 )
468 .with_plan_context(LazyPlanContext {
469 subgraph_name: || Some(job.subgraph_name.clone()),
470 affected_path: || None,
471 })?;
472
473 if let Some((mut response, output_rewrites)) = self.process_subgraph_response(
474 job.subgraph_name.as_ref(),
475 ctx,
476 job.response.body,
477 job.fetch_node_id,
478 ) {
479 ctx.handle_errors(&job.subgraph_name, None, response.errors, None);
480 if let Some(output_rewrites) = output_rewrites {
481 for output_rewrite in output_rewrites {
482 output_rewrite
483 .rewrite(&self.schema_metadata.possible_types, &mut response.data);
484 }
485 }
486
487 deep_merge(&mut ctx.final_response, response.data);
488 }
489 }
490 ExecutionJob::FlattenFetch(job) => {
491 apply_subgraph_response_headers(
492 self.headers_plan,
493 &job.subgraph_name,
494 &job.response.headers,
495 self.client_request,
496 &mut ctx.response_headers_aggregator,
497 )
498 .with_plan_context(LazyPlanContext {
499 subgraph_name: || Some(job.subgraph_name.clone()),
500 affected_path: || Some(job.flatten_node_path.to_string()),
501 })?;
502
503 if let Some((mut response, output_rewrites)) = self.process_subgraph_response(
504 &job.subgraph_name,
505 ctx,
506 job.response.body,
507 job.fetch_node_id,
508 ) {
509 if let Some(mut entities) = response.data.take_entities() {
510 if let Some(output_rewrites) = output_rewrites {
511 for output_rewrite in output_rewrites {
512 for entity in &mut entities {
513 output_rewrite
514 .rewrite(&self.schema_metadata.possible_types, entity);
515 }
516 }
517 }
518
519 let mut index = 0;
520 let normalized_path = job.flatten_node_path.as_slice();
521 let initial_error_path = response
523 .errors
524 .as_ref()
525 .map(|_| GraphQLErrorPath::with_capacity(normalized_path.len() + 2));
526 let mut entity_index_error_map = response
527 .errors
528 .as_ref()
529 .map(|_| HashMap::with_capacity(entities.len()));
530 traverse_and_callback_mut(
531 &mut ctx.final_response,
532 normalized_path,
533 self.schema_metadata,
534 initial_error_path,
535 &mut |target, error_path| {
536 let hash = job.representation_hashes[index];
537 if let Some(entity_index) =
538 job.representation_hash_to_index.get(&hash)
539 {
540 if let (Some(error_path), Some(entity_index_error_map)) =
541 (error_path, entity_index_error_map.as_mut())
542 {
543 let error_paths = entity_index_error_map
544 .entry(entity_index)
545 .or_insert_with(Vec::new);
546 error_paths.push(error_path);
547 }
548 if let Some(entity) = entities.get(*entity_index) {
549 let new_val: Value<'_> =
553 unsafe { std::mem::transmute(entity.clone()) };
554 deep_merge(target, new_val);
555 }
556 }
557 index += 1;
558 },
559 );
560 ctx.handle_errors(
561 &job.subgraph_name,
562 Some(job.flatten_node_path.to_string()),
563 response.errors,
564 entity_index_error_map,
565 );
566 } else if let Some(errors) = response.errors {
567 let affected_path = job.flatten_node_path.to_string();
570 ctx.errors.extend(errors.into_iter().map(|e| {
571 e.add_subgraph_name(&job.subgraph_name)
572 .add_affected_path(affected_path.clone())
573 }));
574 }
575 }
576 }
577 ExecutionJob::None => {
578 }
580 };
581 Ok(())
582 }
583
584 fn prepare_flatten_data(
585 &self,
586 final_response: &Value<'exec>,
587 flatten_node: &FlattenNode,
588 ) -> Result<Option<PreparedFlattenData>, PlanExecutionError> {
589 let fetch_node = match flatten_node.node.as_ref() {
590 PlanNode::Fetch(fetch_node) => fetch_node,
591 _ => return Ok(None),
592 };
593 let requires_nodes = match fetch_node.requires.as_ref() {
594 Some(nodes) => nodes,
595 None => return Ok(None),
596 };
597
598 let mut index = 0;
599 let normalized_path = flatten_node.path.as_slice();
600 let mut filtered_representations = Vec::new();
601 filtered_representations.put(OPEN_BRACKET);
602 let proj_ctx = RequestProjectionContext::new(&self.schema_metadata.possible_types);
603 let mut representation_hashes: Vec<u64> = Vec::new();
604 let mut filtered_representations_hashes: HashMap<u64, usize> = HashMap::new();
605 let arena = bumpalo::Bump::new();
606
607 traverse_and_callback(
608 final_response,
609 normalized_path,
610 self.schema_metadata,
611 &mut |entity| {
612 let hash = entity.to_hash(&requires_nodes.items, proj_ctx.possible_types);
613
614 if !entity.is_null() {
615 representation_hashes.push(hash);
616 }
617
618 if filtered_representations_hashes.contains_key(&hash) {
619 return Ok::<(), PlanExecutionError>(());
620 }
621
622 let entity = if let Some(input_rewrites) = &fetch_node.input_rewrites {
623 let new_entity = arena.alloc(entity.clone());
624 for input_rewrite in input_rewrites {
625 input_rewrite.rewrite(&self.schema_metadata.possible_types, new_entity);
626 }
627 new_entity
628 } else {
629 entity
630 };
631
632 let is_projected = project_requires(
633 &proj_ctx,
634 &requires_nodes.items,
635 entity,
636 &mut filtered_representations,
637 filtered_representations_hashes.is_empty(),
638 None,
639 )
640 .with_plan_context(LazyPlanContext {
641 subgraph_name: || Some(fetch_node.service_name.clone()),
642 affected_path: || Some(flatten_node.path.to_string()),
643 })?;
644
645 if is_projected {
646 filtered_representations_hashes.insert(hash, index);
647 }
648
649 index += 1;
650
651 Ok(())
652 },
653 )?;
654 filtered_representations.put(CLOSE_BRACKET);
655
656 if filtered_representations_hashes.is_empty() {
657 return Ok(None);
658 }
659
660 Ok(Some(PreparedFlattenData {
661 representations: filtered_representations,
662 representation_hashes,
663 representation_hash_to_index: filtered_representations_hashes,
664 }))
665 }
666
667 async fn execute_flatten_fetch_node(
668 &self,
669 node: &FlattenNode,
670 representations: Option<Vec<u8>>,
671 representation_hashes: Option<Vec<u64>>,
672 filtered_representations_hashes: Option<HashMap<u64, usize>>,
673 ) -> Result<ExecutionJob, PlanExecutionError> {
674 Ok(match node.node.as_ref() {
675 PlanNode::Fetch(fetch_node) => ExecutionJob::FlattenFetch(FlattenFetchJob {
676 flatten_node_path: node.path.clone(),
677 response: self
678 .execute_fetch_node(fetch_node, representations)
679 .await?
680 .into(),
681 fetch_node_id: fetch_node.id,
682 subgraph_name: fetch_node.service_name.clone(),
683 representation_hashes: representation_hashes.unwrap_or_default(),
684 representation_hash_to_index: filtered_representations_hashes.unwrap_or_default(),
685 }),
686 _ => ExecutionJob::None,
687 })
688 }
689
690 async fn execute_fetch_node(
691 &self,
692 node: &FetchNode,
693 representations: Option<Vec<u8>>,
694 ) -> Result<ExecutionJob, PlanExecutionError> {
695 let mut headers_map = HeaderMap::new();
697 modify_subgraph_request_headers(
698 self.headers_plan,
699 &node.service_name,
700 self.client_request,
701 &mut headers_map,
702 )
703 .with_plan_context(LazyPlanContext {
704 subgraph_name: || Some(node.service_name.clone()),
705 affected_path: || None,
706 })?;
707 let variable_refs =
708 select_fetch_variables(self.variable_values, node.variable_usages.as_ref());
709
710 let mut subgraph_request = SubgraphExecutionRequest {
711 query: node.operation.document_str.as_str(),
712 dedupe: self.dedupe_subgraph_requests,
713 operation_name: node.operation_name.as_deref(),
714 variables: variable_refs,
715 representations,
716 headers: headers_map,
717 extensions: None,
718 };
719
720 if let Some(jwt_forwarding_plan) = &self.jwt_forwarding_plan {
721 subgraph_request.add_request_extensions_field(
722 jwt_forwarding_plan.extension_field_name.clone(),
723 jwt_forwarding_plan.extension_field_value.clone(),
724 );
725 }
726
727 Ok(ExecutionJob::Fetch(FetchJob {
728 fetch_node_id: node.id,
729 subgraph_name: node.service_name.clone(),
730 response: self
731 .executors
732 .execute(&node.service_name, subgraph_request, self.client_request)
733 .await
734 .into(),
735 }))
736 }
737
738 fn log_error(&self, error: &PlanExecutionError) {
739 tracing::error!(
740 subgraph_name = error.subgraph_name(),
741 error = error as &dyn std::error::Error,
742 "Plan execution error"
743 );
744 }
745}
746
747fn condition_node_by_variables<'a>(
748 condition_node: &'a ConditionNode,
749 variable_values: &'a Option<HashMap<String, sonic_rs::Value>>,
750) -> Option<&'a PlanNode> {
751 let vars = variable_values.as_ref()?;
752 let value = vars.get(&condition_node.condition)?;
753 let condition_met = matches!(value.as_ref(), ValueRef::Bool(true));
754
755 if condition_met {
756 condition_node.if_clause.as_deref()
757 } else {
758 condition_node.else_clause.as_deref()
759 }
760}
761
762fn select_fetch_variables<'a>(
763 variable_values: &'a Option<HashMap<String, sonic_rs::Value>>,
764 variable_usages: Option<&BTreeSet<String>>,
765) -> Option<HashMap<&'a str, &'a sonic_rs::Value>> {
766 let values = variable_values.as_ref()?;
767
768 variable_usages.map(|variable_usages| {
769 variable_usages
770 .iter()
771 .filter_map(|var_name| {
772 values
773 .get_key_value(var_name.as_str())
774 .map(|(key, value)| (key.as_str(), value))
775 })
776 .collect()
777 })
778}
779
780#[cfg(test)]
781mod tests {
782 use crate::{
783 context::ExecutionContext,
784 response::graphql_error::{GraphQLErrorExtensions, GraphQLErrorPath},
785 };
786
787 use super::select_fetch_variables;
788 use sonic_rs::Value;
789 use std::collections::{BTreeSet, HashMap};
790
791 fn value_from_number(n: i32) -> Value {
792 sonic_rs::from_str(&n.to_string()).unwrap()
793 }
794
795 #[test]
796 fn select_fetch_variables_only_used_variables() {
797 let mut variable_values_map = HashMap::new();
798 variable_values_map.insert("used".to_string(), value_from_number(1));
799 variable_values_map.insert("unused".to_string(), value_from_number(2));
800 let variable_values = Some(variable_values_map);
801
802 let mut usages = BTreeSet::new();
803 usages.insert("used".to_string());
804
805 let selected = select_fetch_variables(&variable_values, Some(&usages)).unwrap();
806
807 assert_eq!(selected.len(), 1);
808 assert!(selected.contains_key("used"));
809 assert!(!selected.contains_key("unused"));
810 }
811
812 #[test]
813 fn select_fetch_variables_ignores_missing_usage_entries() {
814 let mut variable_values_map = HashMap::new();
815 variable_values_map.insert("present".to_string(), value_from_number(3));
816 let variable_values = Some(variable_values_map);
817
818 let mut usages = BTreeSet::new();
819 usages.insert("present".to_string());
820 usages.insert("missing".to_string());
821
822 let selected = select_fetch_variables(&variable_values, Some(&usages)).unwrap();
823
824 assert_eq!(selected.len(), 1);
825 assert!(selected.contains_key("present"));
826 assert!(!selected.contains_key("missing"));
827 }
828
829 #[test]
830 fn select_fetch_variables_for_no_usage_entries() {
831 let mut variable_values_map = HashMap::new();
832 variable_values_map.insert("unused_1".to_string(), value_from_number(1));
833 variable_values_map.insert("unused_2".to_string(), value_from_number(2));
834
835 let variable_values = Some(variable_values_map);
836
837 let selected = select_fetch_variables(&variable_values, None);
838
839 assert!(selected.is_none());
840 }
841 #[test]
842 fn normalize_entity_errors_correctly() {
848 use crate::response::graphql_error::{GraphQLError, GraphQLErrorPathSegment};
849 use std::collections::HashMap;
850 let mut ctx = ExecutionContext::default();
851 let mut entity_index_error_map: HashMap<&usize, Vec<GraphQLErrorPath>> = HashMap::new();
852 entity_index_error_map.insert(
853 &0,
854 vec![
855 GraphQLErrorPath {
856 segments: vec![
857 GraphQLErrorPathSegment::String("a".to_string()),
858 GraphQLErrorPathSegment::Index(0),
859 ],
860 },
861 GraphQLErrorPath {
862 segments: vec![
863 GraphQLErrorPathSegment::String("b".to_string()),
864 GraphQLErrorPathSegment::Index(1),
865 ],
866 },
867 ],
868 );
869 let response_errors = vec![GraphQLError {
870 message: "Error 1".to_string(),
871 locations: None,
872 path: Some(GraphQLErrorPath {
873 segments: vec![
874 GraphQLErrorPathSegment::String("_entities".to_string()),
875 GraphQLErrorPathSegment::Index(0),
876 GraphQLErrorPathSegment::String("field1".to_string()),
877 ],
878 }),
879 extensions: GraphQLErrorExtensions::default(),
880 }];
881 ctx.handle_errors(
882 "subgraph_a",
883 None,
884 Some(response_errors),
885 Some(entity_index_error_map),
886 );
887 assert_eq!(ctx.errors.len(), 2);
888 assert_eq!(ctx.errors[0].message, "Error 1");
889 assert_eq!(
890 ctx.errors[0].path.as_ref().unwrap().segments,
891 vec![
892 GraphQLErrorPathSegment::String("a".to_string()),
893 GraphQLErrorPathSegment::Index(0),
894 GraphQLErrorPathSegment::String("field1".to_string())
895 ]
896 );
897 assert_eq!(ctx.errors[1].message, "Error 1");
898 assert_eq!(
899 ctx.errors[1].path.as_ref().unwrap().segments,
900 vec![
901 GraphQLErrorPathSegment::String("b".to_string()),
902 GraphQLErrorPathSegment::Index(1),
903 GraphQLErrorPathSegment::String("field1".to_string())
904 ]
905 );
906 }
907}