1use std::collections::{BTreeSet, HashMap};
2
3use bytes::{BufMut, Bytes};
4use futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt};
5use hive_router_query_planner::planner::plan_nodes::{
6 ConditionNode, FetchNode, FetchRewrite, FlattenNode, FlattenNodePath, ParallelNode, PlanNode,
7 QueryPlan, SequenceNode,
8};
9use http::HeaderMap;
10use serde::Deserialize;
11use sonic_rs::ValueRef;
12
13use crate::{
14 context::ExecutionContext,
15 execution::{
16 client_request_details::ClientRequestDetails,
17 error::{IntoPlanExecutionError, LazyPlanContext, PlanExecutionError},
18 jwt_forward::JwtAuthForwardingPlan,
19 rewrites::FetchRewriteExt,
20 },
21 executors::{
22 common::{HttpExecutionResponse, SubgraphExecutionRequest},
23 map::SubgraphExecutorMap,
24 },
25 headers::{
26 plan::HeaderRulesPlan,
27 request::modify_subgraph_request_headers,
28 response::{apply_subgraph_response_headers, modify_client_response_headers},
29 },
30 introspection::{
31 resolve::{resolve_introspection, IntrospectionContext},
32 schema::SchemaMetadata,
33 },
34 projection::{
35 plan::FieldProjectionPlan,
36 request::{project_requires, RequestProjectionContext},
37 response::project_by_operation,
38 },
39 response::{
40 graphql_error::{GraphQLError, GraphQLErrorExtensions, GraphQLErrorPath},
41 merge::deep_merge,
42 subgraph_response::SubgraphResponse,
43 value::Value,
44 },
45 utils::{
46 consts::{CLOSE_BRACKET, OPEN_BRACKET},
47 traverse::{traverse_and_callback, traverse_and_callback_mut},
48 },
49};
50
51pub struct QueryPlanExecutionContext<'exec, 'req> {
52 pub query_plan: &'exec QueryPlan,
53 pub projection_plan: &'exec Vec<FieldProjectionPlan>,
54 pub headers_plan: &'exec HeaderRulesPlan,
55 pub variable_values: &'exec Option<HashMap<String, sonic_rs::Value>>,
56 pub extensions: Option<HashMap<String, sonic_rs::Value>>,
57 pub client_request: &'exec ClientRequestDetails<'exec, 'req>,
58 pub introspection_context: &'exec IntrospectionContext<'exec, 'static>,
59 pub operation_type_name: &'exec str,
60 pub executors: &'exec SubgraphExecutorMap,
61 pub jwt_auth_forwarding: &'exec Option<JwtAuthForwardingPlan>,
62 pub initial_errors: Vec<GraphQLError>,
63}
64
65pub struct PlanExecutionOutput {
66 pub body: Vec<u8>,
67 pub headers: HeaderMap,
68 pub error_count: usize,
69}
70
71pub async fn execute_query_plan<'exec, 'req>(
72 ctx: QueryPlanExecutionContext<'exec, 'req>,
73) -> Result<PlanExecutionOutput, PlanExecutionError> {
74 let init_value = if let Some(introspection_query) = ctx.introspection_context.query {
75 resolve_introspection(introspection_query, ctx.introspection_context)
76 } else if ctx.projection_plan.is_empty() {
77 Value::Null
78 } else {
79 Value::Object(Vec::new())
80 };
81
82 let mut exec_ctx = ExecutionContext::new(ctx.query_plan, init_value, ctx.initial_errors);
83 let executor = Executor::new(
84 ctx.variable_values,
85 ctx.executors,
86 ctx.introspection_context.metadata,
87 ctx.client_request,
88 ctx.headers_plan,
89 ctx.jwt_auth_forwarding,
90 ctx.operation_type_name == "Query",
92 );
93
94 if ctx.query_plan.node.is_some() {
95 executor
96 .execute(&mut exec_ctx, ctx.query_plan.node.as_ref())
97 .await?;
98 }
99
100 let mut response_headers = HeaderMap::new();
101 modify_client_response_headers(exec_ctx.response_headers_aggregator, &mut response_headers)
102 .with_plan_context(LazyPlanContext {
103 subgraph_name: || None,
104 affected_path: || None,
105 })?;
106
107 let final_response = &exec_ctx.final_response;
108 let error_count = exec_ctx.errors.len(); let body = project_by_operation(
110 final_response,
111 exec_ctx.errors,
112 &ctx.extensions,
113 ctx.operation_type_name,
114 ctx.projection_plan,
115 ctx.variable_values,
116 exec_ctx.response_storage.estimate_final_response_size(),
117 )
118 .with_plan_context(LazyPlanContext {
119 subgraph_name: || None,
120 affected_path: || None,
121 })?;
122
123 Ok(PlanExecutionOutput {
124 body,
125 headers: response_headers,
126 error_count,
127 })
128}
129
130pub struct Executor<'exec, 'req> {
131 variable_values: &'exec Option<HashMap<String, sonic_rs::Value>>,
132 schema_metadata: &'exec SchemaMetadata,
133 executors: &'exec SubgraphExecutorMap,
134 client_request: &'exec ClientRequestDetails<'exec, 'req>,
135 headers_plan: &'exec HeaderRulesPlan,
136 jwt_forwarding_plan: &'exec Option<JwtAuthForwardingPlan>,
137 dedupe_subgraph_requests: bool,
138}
139
140struct ConcurrencyScope<'exec, T> {
141 jobs: FuturesUnordered<BoxFuture<'exec, T>>,
142}
143
144impl<'exec, T> ConcurrencyScope<'exec, T> {
145 fn new() -> Self {
146 Self {
147 jobs: FuturesUnordered::new(),
148 }
149 }
150
151 fn spawn(&mut self, future: BoxFuture<'exec, T>) {
152 self.jobs.push(future);
153 }
154
155 async fn join_all(mut self) -> Vec<T> {
156 let mut results = Vec::with_capacity(self.jobs.len());
157 while let Some(result) = self.jobs.next().await {
158 results.push(result);
159 }
160 results
161 }
162}
163
164struct SubgraphOutput {
165 body: Bytes,
166 headers: HeaderMap,
167}
168
169struct FetchJob {
170 fetch_node_id: i64,
171 subgraph_name: String,
172 response: SubgraphOutput,
173}
174
175struct FlattenFetchJob {
176 flatten_node_path: FlattenNodePath,
177 response: SubgraphOutput,
178 fetch_node_id: i64,
179 subgraph_name: String,
180 representation_hashes: Vec<u64>,
181 representation_hash_to_index: HashMap<u64, usize>,
182}
183
184enum ExecutionJob {
185 Fetch(FetchJob),
186 FlattenFetch(FlattenFetchJob),
187 None,
188}
189
190impl From<ExecutionJob> for SubgraphOutput {
191 fn from(value: ExecutionJob) -> Self {
192 match value {
193 ExecutionJob::Fetch(j) => Self {
194 body: j.response.body,
195 headers: j.response.headers,
196 },
197 ExecutionJob::FlattenFetch(j) => Self {
198 body: j.response.body,
199 headers: j.response.headers,
200 },
201 ExecutionJob::None => Self {
202 body: Bytes::new(),
203 headers: HeaderMap::new(),
204 },
205 }
206 }
207}
208
209impl From<HttpExecutionResponse> for SubgraphOutput {
210 fn from(res: HttpExecutionResponse) -> Self {
211 Self {
212 body: res.body,
213 headers: res.headers,
214 }
215 }
216}
217
218struct PreparedFlattenData {
219 representations: Vec<u8>,
220 representation_hashes: Vec<u64>,
221 representation_hash_to_index: HashMap<u64, usize>,
222}
223
224impl<'exec, 'req> Executor<'exec, 'req> {
225 pub fn new(
226 variable_values: &'exec Option<HashMap<String, sonic_rs::Value>>,
227 executors: &'exec SubgraphExecutorMap,
228 schema_metadata: &'exec SchemaMetadata,
229 client_request: &'exec ClientRequestDetails<'exec, 'req>,
230 headers_plan: &'exec HeaderRulesPlan,
231 jwt_forwarding_plan: &'exec Option<JwtAuthForwardingPlan>,
232 dedupe_subgraph_requests: bool,
233 ) -> Self {
234 Executor {
235 variable_values,
236 executors,
237 schema_metadata,
238 client_request,
239 headers_plan,
240 dedupe_subgraph_requests,
241 jwt_forwarding_plan,
242 }
243 }
244
245 pub async fn execute(
246 &self,
247 ctx: &mut ExecutionContext<'exec>,
248 plan: Option<&PlanNode>,
249 ) -> Result<(), PlanExecutionError> {
250 match plan {
251 Some(PlanNode::Fetch(node)) => self.execute_fetch_wave(ctx, node).await,
252 Some(PlanNode::Parallel(node)) => self.execute_parallel_wave(ctx, node).await,
253 Some(PlanNode::Sequence(node)) => self.execute_sequence_wave(ctx, node).await,
254 Some(_) => Ok(()),
257 None => Ok(()),
259 }
260 }
261
262 async fn execute_fetch_wave(
263 &self,
264 ctx: &mut ExecutionContext<'exec>,
265 node: &FetchNode,
266 ) -> Result<(), PlanExecutionError> {
267 match self.execute_fetch_node(node, None).await {
268 Ok(result) => self.process_job_result(ctx, result),
269 Err(err) => {
270 self.log_error(&err);
271 ctx.errors.push(err.into());
272 Ok(())
273 }
274 }
275 }
276
277 async fn execute_sequence_wave(
278 &self,
279 ctx: &mut ExecutionContext<'exec>,
280 node: &SequenceNode,
281 ) -> Result<(), PlanExecutionError> {
282 for child in &node.nodes {
283 Box::pin(self.execute_plan_node(ctx, child)).await?;
284 }
285
286 Ok(())
287 }
288
289 async fn execute_parallel_wave(
290 &self,
291 ctx: &mut ExecutionContext<'exec>,
292 node: &ParallelNode,
293 ) -> Result<(), PlanExecutionError> {
294 let mut scope = ConcurrencyScope::new();
295
296 for child in &node.nodes {
297 let job_future = self.prepare_job_future(child, &ctx.final_response);
298 scope.spawn(job_future);
299 }
300
301 let results = scope.join_all().await;
302
303 for result in results {
304 match result {
305 Ok(job) => {
306 self.process_job_result(ctx, job)?;
307 }
308 Err(err) => {
309 self.log_error(&err);
310 ctx.errors.push(err.into())
311 }
312 }
313 }
314
315 Ok(())
316 }
317
318 async fn execute_plan_node(
319 &self,
320 ctx: &mut ExecutionContext<'exec>,
321 node: &PlanNode,
322 ) -> Result<(), PlanExecutionError> {
323 match node {
324 PlanNode::Fetch(fetch_node) => match self.execute_fetch_node(fetch_node, None).await {
325 Ok(job) => {
326 self.process_job_result(ctx, job)?;
327 }
328 Err(err) => {
329 self.log_error(&err);
330 ctx.errors.push(err.into());
331 }
332 },
333 PlanNode::Parallel(parallel_node) => {
334 self.execute_parallel_wave(ctx, parallel_node).await?;
335 }
336 PlanNode::Flatten(flatten_node) => {
337 match self.prepare_flatten_data(&ctx.final_response, flatten_node) {
338 Ok(Some(p)) => {
339 match self
340 .execute_flatten_fetch_node(
341 flatten_node,
342 Some(p.representations),
343 Some(p.representation_hashes),
344 Some(p.representation_hash_to_index),
345 )
346 .await
347 {
348 Ok(job) => {
349 self.process_job_result(ctx, job)?;
350 }
351 Err(err) => {
352 self.log_error(&err);
353 ctx.errors.push(err.into());
354 }
355 }
356 }
357 Ok(None) => { }
358 Err(err) => {
359 self.log_error(&err);
360 ctx.errors.push(err.into());
361 }
362 }
363 }
364 PlanNode::Sequence(sequence_node) => {
365 self.execute_sequence_wave(ctx, sequence_node).await?;
366 }
367 PlanNode::Condition(condition_node) => {
368 if let Some(node) =
369 condition_node_by_variables(condition_node, self.variable_values)
370 {
371 Box::pin(self.execute_plan_node(ctx, node)).await?;
372 }
373 }
374 _ => {}
376 }
377
378 Ok(())
379 }
380
381 fn prepare_job_future<'wave>(
382 &'wave self,
383 node: &'wave PlanNode,
384 final_response: &Value<'exec>,
385 ) -> BoxFuture<'wave, Result<ExecutionJob, PlanExecutionError>> {
386 match node {
387 PlanNode::Fetch(fetch_node) => Box::pin(self.execute_fetch_node(fetch_node, None)),
388 PlanNode::Flatten(flatten_node) => {
389 match self.prepare_flatten_data(final_response, flatten_node) {
390 Ok(Some(p)) => Box::pin(self.execute_flatten_fetch_node(
391 flatten_node,
392 Some(p.representations),
393 Some(p.representation_hashes),
394 Some(p.representation_hash_to_index),
395 )),
396 Ok(None) => Box::pin(async { Ok(ExecutionJob::None) }),
397 Err(e) => Box::pin(async move { Err(e) }),
398 }
399 }
400 PlanNode::Condition(node) => {
401 match condition_node_by_variables(node, self.variable_values) {
402 Some(node) => Box::pin(self.prepare_job_future(node, final_response)), None => Box::pin(async { Ok(ExecutionJob::None) }),
404 }
405 }
406 _ => Box::pin(async { Ok(ExecutionJob::None) }),
408 }
409 }
410
411 fn process_subgraph_response(
412 &self,
413 subgraph_name: &str,
414 ctx: &mut ExecutionContext<'exec>,
415 response_bytes: Bytes,
416 fetch_node_id: i64,
417 ) -> Option<(SubgraphResponse<'exec>, Option<&'exec Vec<FetchRewrite>>)> {
418 let idx = ctx.response_storage.add_response(response_bytes);
419 let bytes: &'exec [u8] =
426 unsafe { std::mem::transmute(ctx.response_storage.get_bytes(idx)) };
427
428 let output_rewrites: Option<&'exec Vec<FetchRewrite>> =
432 unsafe { std::mem::transmute(ctx.output_rewrites.get(fetch_node_id)) };
433
434 let mut deserializer = sonic_rs::Deserializer::from_slice(bytes);
435 let response = match SubgraphResponse::deserialize(&mut deserializer) {
436 Ok(response) => response,
437 Err(e) => {
438 let message = format!("Failed to deserialize subgraph response: {}", e);
439 let extensions = GraphQLErrorExtensions::new_from_code_and_service_name(
440 "SUBGRAPH_RESPONSE_DESERIALIZATION_FAILED",
441 subgraph_name,
442 );
443 let error = GraphQLError::from_message_and_extensions(message, extensions);
444
445 ctx.errors.push(error);
446 return None;
447 }
448 };
449
450 Some((response, output_rewrites))
451 }
452
453 fn process_job_result(
454 &self,
455 ctx: &mut ExecutionContext<'exec>,
456 job: ExecutionJob,
457 ) -> Result<(), PlanExecutionError> {
458 let _: () = match job {
459 ExecutionJob::Fetch(job) => {
460 apply_subgraph_response_headers(
461 self.headers_plan,
462 &job.subgraph_name,
463 &job.response.headers,
464 self.client_request,
465 &mut ctx.response_headers_aggregator,
466 )
467 .with_plan_context(LazyPlanContext {
468 subgraph_name: || Some(job.subgraph_name.clone()),
469 affected_path: || None,
470 })?;
471
472 if let Some((mut response, output_rewrites)) = self.process_subgraph_response(
473 job.subgraph_name.as_ref(),
474 ctx,
475 job.response.body,
476 job.fetch_node_id,
477 ) {
478 ctx.handle_errors(&job.subgraph_name, None, response.errors, None);
479 if let Some(output_rewrites) = output_rewrites {
480 for output_rewrite in output_rewrites {
481 output_rewrite
482 .rewrite(&self.schema_metadata.possible_types, &mut response.data);
483 }
484 }
485
486 deep_merge(&mut ctx.final_response, response.data);
487 }
488 }
489 ExecutionJob::FlattenFetch(job) => {
490 apply_subgraph_response_headers(
491 self.headers_plan,
492 &job.subgraph_name,
493 &job.response.headers,
494 self.client_request,
495 &mut ctx.response_headers_aggregator,
496 )
497 .with_plan_context(LazyPlanContext {
498 subgraph_name: || Some(job.subgraph_name.clone()),
499 affected_path: || Some(job.flatten_node_path.to_string()),
500 })?;
501
502 if let Some((mut response, output_rewrites)) = self.process_subgraph_response(
503 &job.subgraph_name,
504 ctx,
505 job.response.body,
506 job.fetch_node_id,
507 ) {
508 if let Some(mut entities) = response.data.take_entities() {
509 if let Some(output_rewrites) = output_rewrites {
510 for output_rewrite in output_rewrites {
511 for entity in &mut entities {
512 output_rewrite
513 .rewrite(&self.schema_metadata.possible_types, entity);
514 }
515 }
516 }
517
518 let mut index = 0;
519 let normalized_path = job.flatten_node_path.as_slice();
520 let initial_error_path = response
522 .errors
523 .as_ref()
524 .map(|_| GraphQLErrorPath::with_capacity(normalized_path.len() + 2));
525 let mut entity_index_error_map = response
526 .errors
527 .as_ref()
528 .map(|_| HashMap::with_capacity(entities.len()));
529 traverse_and_callback_mut(
530 &mut ctx.final_response,
531 normalized_path,
532 self.schema_metadata,
533 initial_error_path,
534 &mut |target, error_path| {
535 let hash = job.representation_hashes[index];
536 if let Some(entity_index) =
537 job.representation_hash_to_index.get(&hash)
538 {
539 if let (Some(error_path), Some(entity_index_error_map)) =
540 (error_path, entity_index_error_map.as_mut())
541 {
542 let error_paths = entity_index_error_map
543 .entry(entity_index)
544 .or_insert_with(Vec::new);
545 error_paths.push(error_path);
546 }
547 if let Some(entity) = entities.get(*entity_index) {
548 let new_val: Value<'_> =
552 unsafe { std::mem::transmute(entity.clone()) };
553 deep_merge(target, new_val);
554 }
555 }
556 index += 1;
557 },
558 );
559 ctx.handle_errors(
560 &job.subgraph_name,
561 Some(job.flatten_node_path.to_string()),
562 response.errors,
563 entity_index_error_map,
564 );
565 } else if let Some(errors) = response.errors {
566 let affected_path = job.flatten_node_path.to_string();
569 ctx.errors.extend(errors.into_iter().map(|e| {
570 e.add_subgraph_name(&job.subgraph_name)
571 .add_affected_path(affected_path.clone())
572 }));
573 }
574 }
575 }
576 ExecutionJob::None => {
577 }
579 };
580 Ok(())
581 }
582
583 fn prepare_flatten_data(
584 &self,
585 final_response: &Value<'exec>,
586 flatten_node: &FlattenNode,
587 ) -> Result<Option<PreparedFlattenData>, PlanExecutionError> {
588 let fetch_node = match flatten_node.node.as_ref() {
589 PlanNode::Fetch(fetch_node) => fetch_node,
590 _ => return Ok(None),
591 };
592 let requires_nodes = match fetch_node.requires.as_ref() {
593 Some(nodes) => nodes,
594 None => return Ok(None),
595 };
596
597 let mut index = 0;
598 let normalized_path = flatten_node.path.as_slice();
599 let mut filtered_representations = Vec::new();
600 filtered_representations.put(OPEN_BRACKET);
601 let proj_ctx = RequestProjectionContext::new(&self.schema_metadata.possible_types);
602 let mut representation_hashes: Vec<u64> = Vec::new();
603 let mut filtered_representations_hashes: HashMap<u64, usize> = HashMap::new();
604 let arena = bumpalo::Bump::new();
605
606 traverse_and_callback(
607 final_response,
608 normalized_path,
609 self.schema_metadata,
610 &mut |entity| {
611 let hash = entity.to_hash(&requires_nodes.items, proj_ctx.possible_types);
612
613 if !entity.is_null() {
614 representation_hashes.push(hash);
615 }
616
617 if filtered_representations_hashes.contains_key(&hash) {
618 return Ok::<(), PlanExecutionError>(());
619 }
620
621 let entity = if let Some(input_rewrites) = &fetch_node.input_rewrites {
622 let new_entity = arena.alloc(entity.clone());
623 for input_rewrite in input_rewrites {
624 input_rewrite.rewrite(&self.schema_metadata.possible_types, new_entity);
625 }
626 new_entity
627 } else {
628 entity
629 };
630
631 let is_projected = project_requires(
632 &proj_ctx,
633 &requires_nodes.items,
634 entity,
635 &mut filtered_representations,
636 filtered_representations_hashes.is_empty(),
637 None,
638 )
639 .with_plan_context(LazyPlanContext {
640 subgraph_name: || Some(fetch_node.service_name.clone()),
641 affected_path: || Some(flatten_node.path.to_string()),
642 })?;
643
644 if is_projected {
645 filtered_representations_hashes.insert(hash, index);
646 }
647
648 index += 1;
649
650 Ok(())
651 },
652 )?;
653 filtered_representations.put(CLOSE_BRACKET);
654
655 if filtered_representations_hashes.is_empty() {
656 return Ok(None);
657 }
658
659 Ok(Some(PreparedFlattenData {
660 representations: filtered_representations,
661 representation_hashes,
662 representation_hash_to_index: filtered_representations_hashes,
663 }))
664 }
665
666 async fn execute_flatten_fetch_node(
667 &self,
668 node: &FlattenNode,
669 representations: Option<Vec<u8>>,
670 representation_hashes: Option<Vec<u64>>,
671 filtered_representations_hashes: Option<HashMap<u64, usize>>,
672 ) -> Result<ExecutionJob, PlanExecutionError> {
673 Ok(match node.node.as_ref() {
674 PlanNode::Fetch(fetch_node) => ExecutionJob::FlattenFetch(FlattenFetchJob {
675 flatten_node_path: node.path.clone(),
676 response: self
677 .execute_fetch_node(fetch_node, representations)
678 .await?
679 .into(),
680 fetch_node_id: fetch_node.id,
681 subgraph_name: fetch_node.service_name.clone(),
682 representation_hashes: representation_hashes.unwrap_or_default(),
683 representation_hash_to_index: filtered_representations_hashes.unwrap_or_default(),
684 }),
685 _ => ExecutionJob::None,
686 })
687 }
688
689 async fn execute_fetch_node(
690 &self,
691 node: &FetchNode,
692 representations: Option<Vec<u8>>,
693 ) -> Result<ExecutionJob, PlanExecutionError> {
694 let mut headers_map = HeaderMap::new();
696 modify_subgraph_request_headers(
697 self.headers_plan,
698 &node.service_name,
699 self.client_request,
700 &mut headers_map,
701 )
702 .with_plan_context(LazyPlanContext {
703 subgraph_name: || Some(node.service_name.clone()),
704 affected_path: || None,
705 })?;
706 let variable_refs =
707 select_fetch_variables(self.variable_values, node.variable_usages.as_ref());
708
709 let mut subgraph_request = SubgraphExecutionRequest {
710 query: node.operation.document_str.as_str(),
711 dedupe: self.dedupe_subgraph_requests,
712 operation_name: node.operation_name.as_deref(),
713 variables: variable_refs,
714 representations,
715 headers: headers_map,
716 extensions: None,
717 };
718
719 if let Some(jwt_forwarding_plan) = &self.jwt_forwarding_plan {
720 subgraph_request.add_request_extensions_field(
721 jwt_forwarding_plan.extension_field_name.clone(),
722 jwt_forwarding_plan.extension_field_value.clone(),
723 );
724 }
725
726 Ok(ExecutionJob::Fetch(FetchJob {
727 fetch_node_id: node.id,
728 subgraph_name: node.service_name.clone(),
729 response: self
730 .executors
731 .execute(&node.service_name, subgraph_request, self.client_request)
732 .await
733 .into(),
734 }))
735 }
736
737 fn log_error(&self, error: &PlanExecutionError) {
738 tracing::error!(
739 subgraph_name = error.subgraph_name(),
740 error = error as &dyn std::error::Error,
741 "Plan execution error"
742 );
743 }
744}
745
746fn condition_node_by_variables<'a>(
747 condition_node: &'a ConditionNode,
748 variable_values: &'a Option<HashMap<String, sonic_rs::Value>>,
749) -> Option<&'a PlanNode> {
750 let vars = variable_values.as_ref()?;
751 let value = vars.get(&condition_node.condition)?;
752 let condition_met = matches!(value.as_ref(), ValueRef::Bool(true));
753
754 if condition_met {
755 condition_node.if_clause.as_deref()
756 } else {
757 condition_node.else_clause.as_deref()
758 }
759}
760
761fn select_fetch_variables<'a>(
762 variable_values: &'a Option<HashMap<String, sonic_rs::Value>>,
763 variable_usages: Option<&BTreeSet<String>>,
764) -> Option<HashMap<&'a str, &'a sonic_rs::Value>> {
765 let values = variable_values.as_ref()?;
766
767 variable_usages.map(|variable_usages| {
768 variable_usages
769 .iter()
770 .filter_map(|var_name| {
771 values
772 .get_key_value(var_name.as_str())
773 .map(|(key, value)| (key.as_str(), value))
774 })
775 .collect()
776 })
777}
778
779#[cfg(test)]
780mod tests {
781 use crate::{
782 context::ExecutionContext,
783 response::graphql_error::{GraphQLErrorExtensions, GraphQLErrorPath},
784 };
785
786 use super::select_fetch_variables;
787 use sonic_rs::Value;
788 use std::collections::{BTreeSet, HashMap};
789
790 fn value_from_number(n: i32) -> Value {
791 sonic_rs::from_str(&n.to_string()).unwrap()
792 }
793
794 #[test]
795 fn select_fetch_variables_only_used_variables() {
796 let mut variable_values_map = HashMap::new();
797 variable_values_map.insert("used".to_string(), value_from_number(1));
798 variable_values_map.insert("unused".to_string(), value_from_number(2));
799 let variable_values = Some(variable_values_map);
800
801 let mut usages = BTreeSet::new();
802 usages.insert("used".to_string());
803
804 let selected = select_fetch_variables(&variable_values, Some(&usages)).unwrap();
805
806 assert_eq!(selected.len(), 1);
807 assert!(selected.contains_key("used"));
808 assert!(!selected.contains_key("unused"));
809 }
810
811 #[test]
812 fn select_fetch_variables_ignores_missing_usage_entries() {
813 let mut variable_values_map = HashMap::new();
814 variable_values_map.insert("present".to_string(), value_from_number(3));
815 let variable_values = Some(variable_values_map);
816
817 let mut usages = BTreeSet::new();
818 usages.insert("present".to_string());
819 usages.insert("missing".to_string());
820
821 let selected = select_fetch_variables(&variable_values, Some(&usages)).unwrap();
822
823 assert_eq!(selected.len(), 1);
824 assert!(selected.contains_key("present"));
825 assert!(!selected.contains_key("missing"));
826 }
827
828 #[test]
829 fn select_fetch_variables_for_no_usage_entries() {
830 let mut variable_values_map = HashMap::new();
831 variable_values_map.insert("unused_1".to_string(), value_from_number(1));
832 variable_values_map.insert("unused_2".to_string(), value_from_number(2));
833
834 let variable_values = Some(variable_values_map);
835
836 let selected = select_fetch_variables(&variable_values, None);
837
838 assert!(selected.is_none());
839 }
840 #[test]
841 fn normalize_entity_errors_correctly() {
847 use crate::response::graphql_error::{GraphQLError, GraphQLErrorPathSegment};
848 use std::collections::HashMap;
849 let mut ctx = ExecutionContext::default();
850 let mut entity_index_error_map: HashMap<&usize, Vec<GraphQLErrorPath>> = HashMap::new();
851 entity_index_error_map.insert(
852 &0,
853 vec![
854 GraphQLErrorPath {
855 segments: vec![
856 GraphQLErrorPathSegment::String("a".to_string()),
857 GraphQLErrorPathSegment::Index(0),
858 ],
859 },
860 GraphQLErrorPath {
861 segments: vec![
862 GraphQLErrorPathSegment::String("b".to_string()),
863 GraphQLErrorPathSegment::Index(1),
864 ],
865 },
866 ],
867 );
868 let response_errors = vec![GraphQLError {
869 message: "Error 1".to_string(),
870 locations: None,
871 path: Some(GraphQLErrorPath {
872 segments: vec![
873 GraphQLErrorPathSegment::String("_entities".to_string()),
874 GraphQLErrorPathSegment::Index(0),
875 GraphQLErrorPathSegment::String("field1".to_string()),
876 ],
877 }),
878 extensions: GraphQLErrorExtensions::default(),
879 }];
880 ctx.handle_errors(
881 "subgraph_a",
882 None,
883 Some(response_errors),
884 Some(entity_index_error_map),
885 );
886 assert_eq!(ctx.errors.len(), 2);
887 assert_eq!(ctx.errors[0].message, "Error 1");
888 assert_eq!(
889 ctx.errors[0].path.as_ref().unwrap().segments,
890 vec![
891 GraphQLErrorPathSegment::String("a".to_string()),
892 GraphQLErrorPathSegment::Index(0),
893 GraphQLErrorPathSegment::String("field1".to_string())
894 ]
895 );
896 assert_eq!(ctx.errors[1].message, "Error 1");
897 assert_eq!(
898 ctx.errors[1].path.as_ref().unwrap().segments,
899 vec![
900 GraphQLErrorPathSegment::String("b".to_string()),
901 GraphQLErrorPathSegment::Index(1),
902 GraphQLErrorPathSegment::String("field1".to_string())
903 ]
904 );
905 }
906}