1use std::collections::{BTreeSet, HashMap};
2
3use bytes::{BufMut, Bytes};
4use futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt};
5use hive_router_query_planner::planner::plan_nodes::{
6 ConditionNode, FetchNode, FetchRewrite, FlattenNode, FlattenNodePath, ParallelNode, PlanNode,
7 QueryPlan, SequenceNode,
8};
9use http::HeaderMap;
10use sonic_rs::ValueRef;
11
12use crate::{
13 context::ExecutionContext,
14 execution::{
15 client_request_details::ClientRequestDetails,
16 error::{IntoPlanExecutionError, LazyPlanContext, PlanExecutionError},
17 jwt_forward::JwtAuthForwardingPlan,
18 rewrites::FetchRewriteExt,
19 },
20 executors::{common::SubgraphExecutionRequest, map::SubgraphExecutorMap},
21 headers::{
22 plan::HeaderRulesPlan,
23 request::modify_subgraph_request_headers,
24 response::{apply_subgraph_response_headers, modify_client_response_headers},
25 },
26 introspection::{
27 resolve::{resolve_introspection, IntrospectionContext},
28 schema::SchemaMetadata,
29 },
30 projection::{
31 plan::FieldProjectionPlan,
32 request::{project_requires, RequestProjectionContext},
33 response::project_by_operation,
34 },
35 response::{
36 graphql_error::{GraphQLError, GraphQLErrorPath},
37 merge::deep_merge,
38 subgraph_response::SubgraphResponse,
39 value::Value,
40 },
41 utils::{
42 consts::{CLOSE_BRACKET, OPEN_BRACKET},
43 traverse::{traverse_and_callback, traverse_and_callback_mut},
44 },
45};
46
47pub struct QueryPlanExecutionContext<'exec> {
48 pub query_plan: &'exec QueryPlan,
49 pub projection_plan: &'exec [FieldProjectionPlan],
50 pub headers_plan: &'exec HeaderRulesPlan,
51 pub variable_values: &'exec Option<HashMap<String, sonic_rs::Value>>,
52 pub extensions: Option<HashMap<String, sonic_rs::Value>>,
53 pub client_request: &'exec ClientRequestDetails<'exec>,
54 pub introspection_context: &'exec IntrospectionContext<'exec, 'static>,
55 pub operation_type_name: &'exec str,
56 pub executors: &'exec SubgraphExecutorMap,
57 pub jwt_auth_forwarding: &'exec Option<JwtAuthForwardingPlan>,
58 pub initial_errors: Vec<GraphQLError>,
59}
60
61pub struct PlanExecutionOutput {
62 pub body: Vec<u8>,
63 pub headers: HeaderMap,
64 pub error_count: usize,
65}
66
67pub async fn execute_query_plan<'exec>(
68 ctx: QueryPlanExecutionContext<'exec>,
69) -> Result<PlanExecutionOutput, PlanExecutionError> {
70 let init_value = if let Some(introspection_query) = ctx.introspection_context.query {
71 resolve_introspection(introspection_query, ctx.introspection_context)
72 } else if ctx.projection_plan.is_empty() {
73 Value::Null
74 } else {
75 Value::Object(Vec::new())
76 };
77
78 let mut exec_ctx = ExecutionContext::new(ctx.query_plan, init_value, ctx.initial_errors);
79 let executor = Executor::new(
80 ctx.variable_values,
81 ctx.executors,
82 ctx.introspection_context.metadata,
83 ctx.client_request,
84 ctx.headers_plan,
85 ctx.jwt_auth_forwarding,
86 ctx.operation_type_name == "Query",
88 );
89
90 if ctx.query_plan.node.is_some() {
91 executor
92 .execute(&mut exec_ctx, ctx.query_plan.node.as_ref())
93 .await?;
94 }
95
96 let mut response_headers = HeaderMap::new();
97 modify_client_response_headers(exec_ctx.response_headers_aggregator, &mut response_headers)
98 .with_plan_context(LazyPlanContext {
99 subgraph_name: || None,
100 affected_path: || None,
101 })?;
102
103 let final_response = &exec_ctx.final_response;
104 let error_count = exec_ctx.errors.len(); let body = project_by_operation(
106 final_response,
107 exec_ctx.errors,
108 &ctx.extensions,
109 ctx.operation_type_name,
110 ctx.projection_plan,
111 ctx.variable_values,
112 exec_ctx.response_storage.estimate_final_response_size(),
113 ctx.introspection_context.metadata,
114 )
115 .with_plan_context(LazyPlanContext {
116 subgraph_name: || None,
117 affected_path: || None,
118 })?;
119
120 Ok(PlanExecutionOutput {
121 body,
122 headers: response_headers,
123 error_count,
124 })
125}
126
127pub struct Executor<'exec> {
128 variable_values: &'exec Option<HashMap<String, sonic_rs::Value>>,
129 schema_metadata: &'exec SchemaMetadata,
130 executors: &'exec SubgraphExecutorMap,
131 client_request: &'exec ClientRequestDetails<'exec>,
132 headers_plan: &'exec HeaderRulesPlan,
133 jwt_forwarding_plan: &'exec Option<JwtAuthForwardingPlan>,
134 dedupe_subgraph_requests: bool,
135}
136
137struct ConcurrencyScope<'exec, T> {
138 jobs: FuturesUnordered<BoxFuture<'exec, T>>,
139}
140
141impl<'exec, T> ConcurrencyScope<'exec, T> {
142 fn new() -> Self {
143 Self {
144 jobs: FuturesUnordered::new(),
145 }
146 }
147
148 fn spawn(&mut self, future: BoxFuture<'exec, T>) {
149 self.jobs.push(future);
150 }
151
152 async fn join_all(mut self) -> Vec<T> {
153 let mut results = Vec::with_capacity(self.jobs.len());
154 while let Some(result) = self.jobs.next().await {
155 results.push(result);
156 }
157 results
158 }
159}
160
161struct FetchJob<'exec> {
162 fetch_node_id: i64,
163 subgraph_name: &'exec str,
164 response: SubgraphResponse<'exec>,
165}
166
167struct FlattenFetchJob<'exec> {
168 flatten_node_path: &'exec FlattenNodePath,
169 response: SubgraphResponse<'exec>,
170 fetch_node_id: i64,
171 subgraph_name: &'exec str,
172 representation_hashes: Vec<u64>,
173 representation_hash_to_index: HashMap<u64, usize>,
174}
175
176enum ExecutionJob<'exec> {
177 Fetch(FetchJob<'exec>),
178 FlattenFetch(FlattenFetchJob<'exec>),
179 None,
180}
181
182struct PreparedFlattenData {
183 representations: Vec<u8>,
184 representation_hashes: Vec<u64>,
185 representation_hash_to_index: HashMap<u64, usize>,
186}
187
188impl<'exec> Executor<'exec> {
189 pub fn new(
190 variable_values: &'exec Option<HashMap<String, sonic_rs::Value>>,
191 executors: &'exec SubgraphExecutorMap,
192 schema_metadata: &'exec SchemaMetadata,
193 client_request: &'exec ClientRequestDetails<'exec>,
194 headers_plan: &'exec HeaderRulesPlan,
195 jwt_forwarding_plan: &'exec Option<JwtAuthForwardingPlan>,
196 dedupe_subgraph_requests: bool,
197 ) -> Self {
198 Executor {
199 variable_values,
200 executors,
201 schema_metadata,
202 client_request,
203 headers_plan,
204 dedupe_subgraph_requests,
205 jwt_forwarding_plan,
206 }
207 }
208
209 pub async fn execute(
210 &'exec self,
211 ctx: &mut ExecutionContext<'exec>,
212 plan: Option<&'exec PlanNode>,
213 ) -> Result<(), PlanExecutionError> {
214 match plan {
215 Some(PlanNode::Fetch(node)) => self.execute_fetch_wave(ctx, node).await,
216 Some(PlanNode::Parallel(node)) => self.execute_parallel_wave(ctx, node).await,
217 Some(PlanNode::Sequence(node)) => self.execute_sequence_wave(ctx, node).await,
218 Some(_) => Ok(()),
221 None => Ok(()),
223 }
224 }
225
226 async fn execute_fetch_wave(
227 &'exec self,
228 ctx: &mut ExecutionContext<'exec>,
229 node: &'exec FetchNode,
230 ) -> Result<(), PlanExecutionError> {
231 match self.execute_fetch_node(node, None).await {
232 Ok(result) => self.process_job_result(ctx, result),
233 Err(err) => {
234 self.log_error(&err);
235 ctx.errors.push(err.into());
236 Ok(())
237 }
238 }
239 }
240
241 async fn execute_sequence_wave(
242 &'exec self,
243 ctx: &mut ExecutionContext<'exec>,
244 node: &'exec SequenceNode,
245 ) -> Result<(), PlanExecutionError> {
246 for child in &node.nodes {
247 Box::pin(self.execute_plan_node(ctx, child)).await?;
248 }
249
250 Ok(())
251 }
252
253 async fn execute_parallel_wave(
254 &'exec self,
255 ctx: &mut ExecutionContext<'exec>,
256 node: &'exec ParallelNode,
257 ) -> Result<(), PlanExecutionError> {
258 let mut scope = ConcurrencyScope::new();
259
260 for child in &node.nodes {
261 let job_future = self.prepare_job_future(child, &ctx.final_response);
262 scope.spawn(job_future);
263 }
264
265 let results = scope.join_all().await;
266
267 for result in results {
268 match result {
269 Ok(job) => {
270 self.process_job_result(ctx, job)?;
271 }
272 Err(err) => {
273 self.log_error(&err);
274 ctx.errors.push(err.into())
275 }
276 }
277 }
278
279 Ok(())
280 }
281
282 async fn execute_plan_node(
283 &'exec self,
284 ctx: &mut ExecutionContext<'exec>,
285 node: &'exec PlanNode,
286 ) -> Result<(), PlanExecutionError> {
287 match node {
288 PlanNode::Fetch(fetch_node) => match self.execute_fetch_node(fetch_node, None).await {
289 Ok(job) => {
290 self.process_job_result(ctx, job)?;
291 }
292 Err(err) => {
293 self.log_error(&err);
294 ctx.errors.push(err.into());
295 }
296 },
297 PlanNode::Parallel(parallel_node) => {
298 self.execute_parallel_wave(ctx, parallel_node).await?;
299 }
300 PlanNode::Flatten(flatten_node) => {
301 match self.prepare_flatten_data(&ctx.final_response, flatten_node) {
302 Ok(Some(p)) => {
303 match self
304 .execute_flatten_fetch_node(
305 flatten_node,
306 Some(p.representations),
307 Some(p.representation_hashes),
308 Some(p.representation_hash_to_index),
309 )
310 .await
311 {
312 Ok(job) => {
313 self.process_job_result(ctx, job)?;
314 }
315 Err(err) => {
316 self.log_error(&err);
317 ctx.errors.push(err.into());
318 }
319 }
320 }
321 Ok(None) => { }
322 Err(err) => {
323 self.log_error(&err);
324 ctx.errors.push(err.into());
325 }
326 }
327 }
328 PlanNode::Sequence(sequence_node) => {
329 self.execute_sequence_wave(ctx, sequence_node).await?;
330 }
331 PlanNode::Condition(condition_node) => {
332 if let Some(node) =
333 condition_node_by_variables(condition_node, self.variable_values)
334 {
335 Box::pin(self.execute_plan_node(ctx, node)).await?;
336 }
337 }
338 _ => {}
340 }
341
342 Ok(())
343 }
344
345 fn prepare_job_future(
346 &'exec self,
347 node: &'exec PlanNode,
348 final_response: &Value<'exec>,
349 ) -> BoxFuture<'exec, Result<ExecutionJob<'exec>, PlanExecutionError>> {
350 match node {
351 PlanNode::Fetch(fetch_node) => Box::pin(self.execute_fetch_node(fetch_node, None)),
352 PlanNode::Flatten(flatten_node) => {
353 match self.prepare_flatten_data(final_response, flatten_node) {
354 Ok(Some(p)) => Box::pin(self.execute_flatten_fetch_node(
355 flatten_node,
356 Some(p.representations),
357 Some(p.representation_hashes),
358 Some(p.representation_hash_to_index),
359 )),
360 Ok(None) => Box::pin(async { Ok(ExecutionJob::None) }),
361 Err(e) => Box::pin(async move { Err(e) }),
362 }
363 }
364 PlanNode::Condition(node) => {
365 match condition_node_by_variables(node, self.variable_values) {
366 Some(node) => Box::pin(self.prepare_job_future(node, final_response)), None => Box::pin(async { Ok(ExecutionJob::None) }),
368 }
369 }
370 _ => Box::pin(async { Ok(ExecutionJob::None) }),
372 }
373 }
374
375 fn process_subgraph_response(
376 &self,
377 ctx: &mut ExecutionContext<'exec>,
378 response_bytes: Option<Bytes>,
379 fetch_node_id: i64,
380 ) -> Option<&'exec [FetchRewrite]> {
381 if let Some(response_bytes) = response_bytes {
382 ctx.response_storage.add_response(response_bytes);
383 }
384
385 ctx.output_rewrites.get(fetch_node_id)
386 }
387
388 fn process_job_result(
389 &self,
390 ctx: &mut ExecutionContext<'exec>,
391 job: ExecutionJob<'exec>,
392 ) -> Result<(), PlanExecutionError> {
393 match job {
394 ExecutionJob::Fetch(mut job) => {
395 if let Some(response_headers) = &job.response.headers {
396 apply_subgraph_response_headers(
397 self.headers_plan,
398 job.subgraph_name,
399 response_headers,
400 self.client_request,
401 &mut ctx.response_headers_aggregator,
402 )
403 .with_plan_context(LazyPlanContext {
404 subgraph_name: || Some(job.subgraph_name.into()),
405 affected_path: || None,
406 })?;
407 }
408
409 if let Some(output_rewrites) =
410 self.process_subgraph_response(ctx, job.response.bytes, job.fetch_node_id)
411 {
412 for output_rewrite in output_rewrites {
413 output_rewrite
414 .rewrite(&self.schema_metadata.possible_types, &mut job.response.data);
415 }
416 }
417
418 ctx.handle_errors(job.subgraph_name, None, job.response.errors, None);
419
420 deep_merge(&mut ctx.final_response, job.response.data);
421 }
422 ExecutionJob::FlattenFetch(mut job) => {
423 if let Some(response_headers) = &job.response.headers {
424 apply_subgraph_response_headers(
425 self.headers_plan,
426 job.subgraph_name,
427 response_headers,
428 self.client_request,
429 &mut ctx.response_headers_aggregator,
430 )
431 .with_plan_context(LazyPlanContext {
432 subgraph_name: || Some(job.subgraph_name.into()),
433 affected_path: || None,
434 })?;
435 }
436
437 let output_rewrites =
438 self.process_subgraph_response(ctx, job.response.bytes, job.fetch_node_id);
439
440 let mut entity_index_error_map: Option<HashMap<&usize, Vec<GraphQLErrorPath>>> =
441 None;
442
443 if let Some(mut entities) = job.response.data.take_entities() {
444 if let Some(output_rewrites) = output_rewrites {
445 for output_rewrite in output_rewrites {
446 for entity in &mut entities {
447 output_rewrite
448 .rewrite(&self.schema_metadata.possible_types, entity);
449 }
450 }
451 }
452
453 let mut index = 0;
454 let normalized_path = job.flatten_node_path.as_slice();
455 let initial_error_path = job
457 .response
458 .errors
459 .as_ref()
460 .map(|_| GraphQLErrorPath::with_capacity(normalized_path.len() + 2));
461 entity_index_error_map = job
462 .response
463 .errors
464 .as_ref()
465 .map(|_| HashMap::with_capacity(entities.len()));
466 traverse_and_callback_mut(
467 &mut ctx.final_response,
468 normalized_path,
469 self.schema_metadata,
470 initial_error_path,
471 &mut |target, error_path| {
472 let hash = job.representation_hashes[index];
473 if let Some(entity_index) = job.representation_hash_to_index.get(&hash)
474 {
475 if let (Some(error_path), Some(entity_index_error_map)) =
476 (error_path, entity_index_error_map.as_mut())
477 {
478 let error_paths = entity_index_error_map
479 .entry(entity_index)
480 .or_insert_with(Vec::new);
481 error_paths.push(error_path);
482 }
483 if let Some(entity) = entities.get(*entity_index) {
484 deep_merge(target, entity.clone());
485 }
486 }
487 index += 1;
488 },
489 );
490 }
491 ctx.handle_errors(
492 job.subgraph_name,
493 Some(job.flatten_node_path),
494 job.response.errors,
495 entity_index_error_map,
496 );
497 }
498 ExecutionJob::None => {
499 }
501 }
502 Ok(())
503 }
504
505 fn prepare_flatten_data(
506 &self,
507 final_response: &Value<'exec>,
508 flatten_node: &'exec FlattenNode,
509 ) -> Result<Option<PreparedFlattenData>, PlanExecutionError> {
510 let fetch_node = match flatten_node.node.as_ref() {
511 PlanNode::Fetch(fetch_node) => fetch_node,
512 _ => return Ok(None),
513 };
514 let requires_nodes = match fetch_node.requires.as_ref() {
515 Some(nodes) => nodes,
516 None => return Ok(None),
517 };
518
519 let mut index = 0;
520 let normalized_path = flatten_node.path.as_slice();
521 let mut filtered_representations = Vec::new();
522 filtered_representations.put(OPEN_BRACKET);
523 let proj_ctx = RequestProjectionContext::new(&self.schema_metadata.possible_types);
524 let mut representation_hashes: Vec<u64> = Vec::new();
525 let mut filtered_representations_hashes: HashMap<u64, usize> = HashMap::new();
526 let arena = bumpalo::Bump::new();
527
528 traverse_and_callback(
529 final_response,
530 normalized_path,
531 self.schema_metadata,
532 &mut |entity| {
533 let hash = entity.to_hash(&requires_nodes.items, proj_ctx.possible_types);
534
535 if !entity.is_null() {
536 representation_hashes.push(hash);
537 }
538
539 if filtered_representations_hashes.contains_key(&hash) {
540 return Ok::<(), PlanExecutionError>(());
541 }
542
543 let entity = if let Some(input_rewrites) = &fetch_node.input_rewrites {
544 let new_entity = arena.alloc(entity.clone());
545 for input_rewrite in input_rewrites {
546 input_rewrite.rewrite(&self.schema_metadata.possible_types, new_entity);
547 }
548 new_entity
549 } else {
550 entity
551 };
552
553 let is_projected = project_requires(
554 &proj_ctx,
555 &requires_nodes.items,
556 entity,
557 &mut filtered_representations,
558 filtered_representations_hashes.is_empty(),
559 None,
560 )
561 .with_plan_context(LazyPlanContext {
562 subgraph_name: || Some(fetch_node.service_name.clone()),
563 affected_path: || Some(flatten_node.path.to_string()),
564 })?;
565
566 if is_projected {
567 filtered_representations_hashes.insert(hash, index);
568 }
569
570 index += 1;
571
572 Ok(())
573 },
574 )?;
575 filtered_representations.put(CLOSE_BRACKET);
576
577 if filtered_representations_hashes.is_empty() {
578 return Ok(None);
579 }
580
581 Ok(Some(PreparedFlattenData {
582 representations: filtered_representations,
583 representation_hashes,
584 representation_hash_to_index: filtered_representations_hashes,
585 }))
586 }
587
588 async fn execute_flatten_fetch_node(
589 &'exec self,
590 node: &'exec FlattenNode,
591 representations: Option<Vec<u8>>,
592 representation_hashes: Option<Vec<u64>>,
593 filtered_representations_hashes: Option<HashMap<u64, usize>>,
594 ) -> Result<ExecutionJob<'exec>, PlanExecutionError> {
595 let fetch_node = match node.node.as_ref() {
596 PlanNode::Fetch(fetch_node) => fetch_node,
597 _ => return Ok(ExecutionJob::None),
598 };
599
600 match self.execute_fetch_node(fetch_node, representations).await? {
601 ExecutionJob::Fetch(job) => Ok(ExecutionJob::FlattenFetch(FlattenFetchJob {
602 flatten_node_path: &node.path,
603 response: job.response,
604 fetch_node_id: job.fetch_node_id,
605 subgraph_name: job.subgraph_name,
606 representation_hashes: representation_hashes.unwrap_or_default(),
607 representation_hash_to_index: filtered_representations_hashes.unwrap_or_default(),
608 })),
609 _ => Ok(ExecutionJob::None),
610 }
611 }
612
613 async fn execute_fetch_node(
614 &'exec self,
615 node: &'exec FetchNode,
616 representations: Option<Vec<u8>>,
617 ) -> Result<ExecutionJob<'exec>, PlanExecutionError> {
618 let mut headers_map = HeaderMap::new();
620 modify_subgraph_request_headers(
621 self.headers_plan,
622 &node.service_name,
623 self.client_request,
624 &mut headers_map,
625 )
626 .with_plan_context(LazyPlanContext {
627 subgraph_name: || Some(node.service_name.clone()),
628 affected_path: || None,
629 })?;
630 let variable_refs =
631 select_fetch_variables(self.variable_values, node.variable_usages.as_ref());
632
633 let mut subgraph_request = SubgraphExecutionRequest {
634 query: node.operation.document_str.as_str(),
635 dedupe: self.dedupe_subgraph_requests,
636 operation_name: node.operation_name.as_deref(),
637 variables: variable_refs,
638 representations,
639 headers: headers_map,
640 extensions: None,
641 };
642
643 if let Some(jwt_forwarding_plan) = &self.jwt_forwarding_plan {
644 subgraph_request.add_request_extensions_field(
645 jwt_forwarding_plan.extension_field_name.clone(),
646 jwt_forwarding_plan.extension_field_value.clone(),
647 );
648 }
649
650 Ok(ExecutionJob::Fetch(FetchJob {
651 fetch_node_id: node.id,
652 subgraph_name: &node.service_name,
653 response: self
654 .executors
655 .execute(&node.service_name, subgraph_request, self.client_request)
656 .await,
657 }))
658 }
659
660 fn log_error(&self, error: &PlanExecutionError) {
661 tracing::error!(
662 subgraph_name = error.subgraph_name(),
663 error = error as &dyn std::error::Error,
664 "Plan execution error"
665 );
666 }
667}
668
669fn condition_node_by_variables<'a>(
670 condition_node: &'a ConditionNode,
671 variable_values: &'a Option<HashMap<String, sonic_rs::Value>>,
672) -> Option<&'a PlanNode> {
673 let vars = variable_values.as_ref()?;
674 let value = vars.get(&condition_node.condition)?;
675 let condition_met = matches!(value.as_ref(), ValueRef::Bool(true));
676
677 if condition_met {
678 condition_node.if_clause.as_deref()
679 } else {
680 condition_node.else_clause.as_deref()
681 }
682}
683
684fn select_fetch_variables<'a>(
685 variable_values: &'a Option<HashMap<String, sonic_rs::Value>>,
686 variable_usages: Option<&BTreeSet<String>>,
687) -> Option<HashMap<&'a str, &'a sonic_rs::Value>> {
688 let values = variable_values.as_ref()?;
689
690 variable_usages.map(|variable_usages| {
691 variable_usages
692 .iter()
693 .filter_map(|var_name| {
694 values
695 .get_key_value(var_name.as_str())
696 .map(|(key, value)| (key.as_str(), value))
697 })
698 .collect()
699 })
700}
701
702#[cfg(test)]
703mod tests {
704 use crate::{
705 context::ExecutionContext,
706 response::graphql_error::{GraphQLErrorExtensions, GraphQLErrorPath},
707 };
708
709 use super::select_fetch_variables;
710 use sonic_rs::Value;
711 use std::collections::{BTreeSet, HashMap};
712
713 fn value_from_number(n: i32) -> Value {
714 sonic_rs::from_str(&n.to_string()).unwrap()
715 }
716
717 #[test]
718 fn select_fetch_variables_only_used_variables() {
719 let mut variable_values_map = HashMap::new();
720 variable_values_map.insert("used".to_string(), value_from_number(1));
721 variable_values_map.insert("unused".to_string(), value_from_number(2));
722 let variable_values = Some(variable_values_map);
723
724 let mut usages = BTreeSet::new();
725 usages.insert("used".to_string());
726
727 let selected = select_fetch_variables(&variable_values, Some(&usages)).unwrap();
728
729 assert_eq!(selected.len(), 1);
730 assert!(selected.contains_key("used"));
731 assert!(!selected.contains_key("unused"));
732 }
733
734 #[test]
735 fn select_fetch_variables_ignores_missing_usage_entries() {
736 let mut variable_values_map = HashMap::new();
737 variable_values_map.insert("present".to_string(), value_from_number(3));
738 let variable_values = Some(variable_values_map);
739
740 let mut usages = BTreeSet::new();
741 usages.insert("present".to_string());
742 usages.insert("missing".to_string());
743
744 let selected = select_fetch_variables(&variable_values, Some(&usages)).unwrap();
745
746 assert_eq!(selected.len(), 1);
747 assert!(selected.contains_key("present"));
748 assert!(!selected.contains_key("missing"));
749 }
750
751 #[test]
752 fn select_fetch_variables_for_no_usage_entries() {
753 let mut variable_values_map = HashMap::new();
754 variable_values_map.insert("unused_1".to_string(), value_from_number(1));
755 variable_values_map.insert("unused_2".to_string(), value_from_number(2));
756
757 let variable_values = Some(variable_values_map);
758
759 let selected = select_fetch_variables(&variable_values, None);
760
761 assert!(selected.is_none());
762 }
763 #[test]
764 fn normalize_entity_errors_correctly() {
770 use crate::response::graphql_error::{GraphQLError, GraphQLErrorPathSegment};
771 use std::collections::HashMap;
772 let mut ctx = ExecutionContext::default();
773 let mut entity_index_error_map: HashMap<&usize, Vec<GraphQLErrorPath>> = HashMap::new();
774 entity_index_error_map.insert(
775 &0,
776 vec![
777 GraphQLErrorPath {
778 segments: vec![
779 GraphQLErrorPathSegment::String("a".to_string()),
780 GraphQLErrorPathSegment::Index(0),
781 ],
782 },
783 GraphQLErrorPath {
784 segments: vec![
785 GraphQLErrorPathSegment::String("b".to_string()),
786 GraphQLErrorPathSegment::Index(1),
787 ],
788 },
789 ],
790 );
791 let response_errors = vec![GraphQLError {
792 message: "Error 1".to_string(),
793 locations: None,
794 path: Some(GraphQLErrorPath {
795 segments: vec![
796 GraphQLErrorPathSegment::String("_entities".to_string()),
797 GraphQLErrorPathSegment::Index(0),
798 GraphQLErrorPathSegment::String("field1".to_string()),
799 ],
800 }),
801 extensions: GraphQLErrorExtensions::default(),
802 }];
803 ctx.handle_errors(
804 "subgraph_a",
805 None,
806 Some(response_errors),
807 Some(entity_index_error_map),
808 );
809 assert_eq!(ctx.errors.len(), 2);
810 assert_eq!(ctx.errors[0].message, "Error 1");
811 assert_eq!(
812 ctx.errors[0].path.as_ref().unwrap().segments,
813 vec![
814 GraphQLErrorPathSegment::String("a".to_string()),
815 GraphQLErrorPathSegment::Index(0),
816 GraphQLErrorPathSegment::String("field1".to_string())
817 ]
818 );
819 assert_eq!(ctx.errors[1].message, "Error 1");
820 assert_eq!(
821 ctx.errors[1].path.as_ref().unwrap().segments,
822 vec![
823 GraphQLErrorPathSegment::String("b".to_string()),
824 GraphQLErrorPathSegment::Index(1),
825 GraphQLErrorPathSegment::String("field1".to_string())
826 ]
827 );
828 }
829}