1use std::collections::HashMap;
2use std::sync::Arc;
3
4use futures::StreamExt;
5use futures::future::join_all;
6use futures::prelude::*;
7use tokio::sync::broadcast;
8use tokio::sync::mpsc;
9use tokio_stream::wrappers::BroadcastStream;
10use tower::ServiceExt;
11use tracing::Instrument;
12
13use super::DeferredNode;
14use super::PlanNode;
15use super::QueryPlan;
16use super::log;
17use super::subscription::SubscriptionHandle;
18use crate::Context;
19use crate::axum_factory::CanceledRequest;
20use crate::error::Error;
21use crate::graphql::Request;
22use crate::graphql::Response;
23use crate::json_ext::Object;
24use crate::json_ext::Path;
25use crate::json_ext::PathElement;
26use crate::json_ext::Value;
27use crate::json_ext::ValueExt;
28use crate::plugins::subscription::SubscriptionConfig;
29use crate::query_planner::CONDITION_ELSE_SPAN_NAME;
30use crate::query_planner::CONDITION_IF_SPAN_NAME;
31use crate::query_planner::CONDITION_SPAN_NAME;
32use crate::query_planner::DEFER_DEFERRED_SPAN_NAME;
33use crate::query_planner::DEFER_PRIMARY_SPAN_NAME;
34use crate::query_planner::DEFER_SPAN_NAME;
35use crate::query_planner::FLATTEN_SPAN_NAME;
36use crate::query_planner::FlattenNode;
37use crate::query_planner::PARALLEL_SPAN_NAME;
38use crate::query_planner::Primary;
39use crate::query_planner::SEQUENCE_SPAN_NAME;
40use crate::query_planner::fetch::FetchNode;
41use crate::query_planner::fetch::SubgraphSchemas;
42use crate::query_planner::fetch::Variables;
43use crate::services::FetchRequest;
44use crate::services::fetch;
45use crate::services::fetch::ErrorMapping;
46use crate::services::fetch::SubscriptionRequest;
47use crate::services::fetch_service::FetchServiceFactory;
48use crate::services::new_service::ServiceFactory;
49use crate::spec::Query;
50use crate::spec::Schema;
51
52impl QueryPlan {
53 #[allow(clippy::too_many_arguments)]
54 pub(crate) async fn execute<'a>(
56 &self,
57 context: &'a Context,
58 service_factory: &'a Arc<FetchServiceFactory>,
59 supergraph_request: &'a Arc<http::Request<Request>>,
63 schema: &'a Arc<Schema>,
64 subgraph_schemas: &'a Arc<SubgraphSchemas>,
65 sender: mpsc::Sender<Response>,
67 subscription_handle: Option<SubscriptionHandle>,
68 subscription_config: &'a Option<SubscriptionConfig>,
69 initial_value: Option<Value>,
71 ) -> Response {
72 let root = Path::empty();
73
74 log::trace_query_plan(&self.root);
75 let deferred_fetches = HashMap::new();
76
77 let (value, errors) = self
78 .root
79 .execute_recursively(
80 &ExecutionParameters {
81 context,
82 service_factory,
83 schema,
84 supergraph_request,
85 deferred_fetches: &deferred_fetches,
86 query: &self.query,
87 root_node: &self.root,
88 subscription_handle: &subscription_handle,
89 subscription_config,
90 subgraph_schemas,
91 },
92 &root,
93 &initial_value.unwrap_or_default(),
94 sender,
95 )
96 .await;
97 if !deferred_fetches.is_empty() {
98 u64_counter!(
99 "apollo.router.operations.defer",
100 "Number of requests that request deferred data",
101 1
102 );
103 }
104
105 Response::builder().data(value).errors(errors).build()
106 }
107
108 pub fn contains_mutations(&self) -> bool {
109 self.root.contains_mutations()
110 }
111
112 pub fn subgraph_fetches(&self) -> usize {
113 self.root.subgraph_fetches()
114 }
115}
116
117pub(crate) struct ExecutionParameters<'a> {
119 pub(crate) context: &'a Context,
120 pub(crate) service_factory: &'a Arc<FetchServiceFactory>,
121 pub(crate) schema: &'a Arc<Schema>,
122 pub(crate) subgraph_schemas: &'a Arc<SubgraphSchemas>,
123 pub(crate) supergraph_request: &'a Arc<http::Request<Request>>,
124 pub(crate) deferred_fetches: &'a HashMap<String, broadcast::Sender<(Value, Vec<Error>)>>,
125 pub(crate) query: &'a Arc<Query>,
126 pub(crate) root_node: &'a PlanNode,
127 pub(crate) subscription_handle: &'a Option<SubscriptionHandle>,
128 pub(crate) subscription_config: &'a Option<SubscriptionConfig>,
129}
130
131impl PlanNode {
132 pub(super) fn execute_recursively<'a>(
133 &'a self,
134 parameters: &'a ExecutionParameters<'a>,
135 current_dir: &'a Path,
136 parent_value: &'a Value,
137 sender: mpsc::Sender<Response>,
138 ) -> future::BoxFuture<'a, (Value, Vec<Error>)> {
139 Box::pin(async move {
140 tracing::trace!("executing plan:\n{:#?}", self);
141 let mut value;
142 let mut errors;
143
144 match self {
145 PlanNode::Sequence { nodes } => {
146 value = parent_value.clone();
147 errors = Vec::new();
148 async {
149 for node in nodes {
150 let (v, err) = node
151 .execute_recursively(
152 parameters,
153 current_dir,
154 &value,
155 sender.clone(),
156 )
157 .in_current_span()
158 .await;
159 value.type_aware_deep_merge(v, parameters.schema);
160 errors.extend(err.into_iter());
161 }
162 }
163 .instrument(tracing::info_span!(
164 SEQUENCE_SPAN_NAME,
165 "otel.kind" = "INTERNAL"
166 ))
167 .await
168 }
169 PlanNode::Parallel { nodes } => {
170 value = Value::default();
171 errors = Vec::new();
172 async {
173 let mut stream: stream::FuturesUnordered<_> = nodes
174 .iter()
175 .map(|plan| {
176 plan.execute_recursively(
177 parameters,
178 current_dir,
179 parent_value,
180 sender.clone(),
181 )
182 .in_current_span()
183 })
184 .collect();
185
186 while let Some((v, err)) = stream.next().in_current_span().await {
187 value.type_aware_deep_merge(v, parameters.schema);
188 errors.extend(err.into_iter());
189 }
190 }
191 .instrument(tracing::info_span!(
192 PARALLEL_SPAN_NAME,
193 "otel.kind" = "INTERNAL"
194 ))
195 .await
196 }
197 PlanNode::Flatten(FlattenNode { path, node }) => {
198 let current_dir = current_dir.join(path.remove_empty_key_root());
200 let (v, err) = node
201 .execute_recursively(
202 parameters,
203 ¤t_dir,
205 parent_value,
206 sender,
207 )
208 .instrument(tracing::info_span!(
209 FLATTEN_SPAN_NAME,
210 "graphql.path" = %current_dir,
211 "otel.kind" = "INTERNAL"
212 ))
213 .await;
214 value = v;
215 errors = err;
216 }
217 PlanNode::Subscription {
218 primary: subscription_node,
219 ..
220 } => {
221 if parameters.subscription_handle.is_none() {
222 tracing::error!("No subscription handle provided for a subscription");
223 value = Value::default();
224 errors = vec![
225 Error::builder()
226 .message("no subscription handle provided for a subscription")
227 .extension_code("NO_SUBSCRIPTION_HANDLE")
228 .build(),
229 ];
230 } else {
231 match Variables::new(
232 &[],
233 &subscription_node.variable_usages,
234 parent_value,
235 current_dir,
236 parameters.supergraph_request,
237 parameters.schema,
238 &subscription_node.input_rewrites,
239 &None,
240 ) {
241 Some(variables) => {
242 let service = parameters.service_factory.create();
243 let request = fetch::Request::Subscription(
244 SubscriptionRequest::builder()
245 .context(parameters.context.clone())
246 .subscription_node(subscription_node.clone())
247 .supergraph_request(parameters.supergraph_request.clone())
248 .variables(variables)
249 .current_dir(current_dir.clone())
250 .sender(sender)
251 .and_subscription_handle(
252 parameters.subscription_handle.clone(),
253 )
254 .and_subscription_config(
255 parameters.subscription_config.clone(),
256 )
257 .build(),
258 );
259 (value, errors) =
260 match service.oneshot(request).await.map_to_graphql_error(
261 subscription_node.service_name.to_string(),
262 current_dir,
263 ) {
264 Ok(r) => r,
265 Err(e) => (Value::default(), vec![e]),
266 };
267 }
268 None => {
269 value = Value::Object(Object::default());
270 errors = Vec::new();
271 }
272 };
273 }
274 }
275 PlanNode::Fetch(fetch_node) => {
276 if parameters
279 .context
280 .extensions()
281 .with_lock(|lock| lock.get::<CanceledRequest>().is_some())
282 {
283 value = Value::Object(Object::default());
284 errors = Vec::new();
285 } else {
286 match Variables::new(
287 &fetch_node.requires,
288 &fetch_node.variable_usages,
289 parent_value,
290 current_dir,
291 parameters.supergraph_request,
292 parameters.schema.as_ref(),
293 &fetch_node.input_rewrites,
294 &fetch_node.context_rewrites,
295 ) {
296 Some(variables) => {
297 let paths = variables.inverted_paths.clone();
298 let service = parameters.service_factory.create();
299 let request = fetch::Request::Fetch(
300 FetchRequest::builder()
301 .context(parameters.context.clone())
302 .fetch_node(fetch_node.clone())
303 .supergraph_request(parameters.supergraph_request.clone())
304 .variables(variables)
305 .current_dir(current_dir.clone())
306 .build(),
307 );
308 let raw_errors;
309 (value, raw_errors) =
310 match service.oneshot(request).await.map_to_graphql_error(
311 fetch_node.service_name.to_string(),
312 current_dir,
313 ) {
314 Ok(r) => r,
315 Err(e) => (Value::default(), vec![e]),
316 };
317
318 errors = Vec::default();
323 for err in raw_errors {
324 if let Some(err_path) = err.path.as_ref()
325 && err_path
326 .iter()
327 .any(|elem| matches!(elem, PathElement::Flatten(_)))
328 {
329 for path in paths.iter().flatten() {
330 if err_path.equal_if_flattened(path) {
331 let mut err = err.clone();
332 err.path = Some(path.clone());
333 errors.push(err);
334 }
335 }
336
337 continue;
338 }
339
340 errors.push(err);
341 }
342
343 FetchNode::deferred_fetches(
344 current_dir,
345 &fetch_node.id,
346 parameters.deferred_fetches,
347 &value,
348 &errors,
349 );
350 }
351 None => {
352 value = Value::Object(Object::default());
353 errors = Vec::new();
354 }
355 };
356 }
357 }
358 PlanNode::Defer {
359 primary: Primary { node, .. },
360 deferred,
361 } => {
362 value = parent_value.clone();
363 errors = Vec::new();
364 async {
365 let mut deferred_fetches: HashMap<
366 String,
367 broadcast::Sender<(Value, Vec<Error>)>,
368 > = HashMap::new();
369 let mut futures = Vec::new();
370
371 let (primary_sender, _) =
372 tokio::sync::broadcast::channel::<(Value, Vec<Error>)>(1);
373
374 for deferred_node in deferred {
375 let fut = deferred_node
376 .execute(
377 parameters,
378 parent_value,
379 sender.clone(),
380 &primary_sender,
381 &mut deferred_fetches,
382 )
383 .in_current_span();
384
385 futures.push(fut);
386 }
387
388 tokio::task::spawn(async move {
389 join_all(futures).await;
390 });
391
392 if let Some(node) = node {
393 let (v, err) = node
394 .execute_recursively(
395 &ExecutionParameters {
396 context: parameters.context,
397 service_factory: parameters.service_factory,
398 schema: parameters.schema,
399 supergraph_request: parameters.supergraph_request,
400 deferred_fetches: &deferred_fetches,
401 query: parameters.query,
402 root_node: parameters.root_node,
403 subscription_handle: parameters.subscription_handle,
404 subscription_config: parameters.subscription_config,
405 subgraph_schemas: parameters.subgraph_schemas,
406 },
407 current_dir,
408 &value,
409 sender,
410 )
411 .instrument(tracing::info_span!(
412 DEFER_PRIMARY_SPAN_NAME,
413 "otel.kind" = "INTERNAL"
414 ))
415 .await;
416 value.type_aware_deep_merge(v, parameters.schema);
417 errors.extend(err.into_iter());
418
419 let _ = primary_sender.send((value.clone(), errors.clone()));
420 } else {
421 let _ = primary_sender.send((value.clone(), errors.clone()));
422 value.deep_merge(Value::Object(Default::default()));
424 }
425 }
426 .instrument(tracing::info_span!(
427 DEFER_SPAN_NAME,
428 "otel.kind" = "INTERNAL"
429 ))
430 .await
431 }
432 PlanNode::Condition {
433 condition,
434 if_clause,
435 else_clause,
436 } => {
437 value = Value::default();
438 errors = Vec::new();
439
440 async {
441 let v = parameters
442 .query
443 .variable_value(
444 condition.as_str(),
445 ¶meters.supergraph_request.body().variables,
446 )
447 .unwrap_or(&Value::Bool(true)); if let &Value::Bool(true) = v {
450 if let Some(node) = if_clause {
452 let (v, err) = node
453 .execute_recursively(
454 parameters,
455 current_dir,
456 parent_value,
457 sender.clone(),
458 )
459 .instrument(tracing::info_span!(
460 CONDITION_IF_SPAN_NAME,
461 "otel.kind" = "INTERNAL"
462 ))
463 .await;
464 value.type_aware_deep_merge(v, parameters.schema);
465 errors.extend(err.into_iter());
466 } else if current_dir.is_empty() {
467 value.deep_merge(Value::Object(Default::default()));
470 }
471 } else if let Some(node) = else_clause {
472 let (v, err) = node
473 .execute_recursively(
474 parameters,
475 current_dir,
476 parent_value,
477 sender.clone(),
478 )
479 .instrument(tracing::info_span!(
480 CONDITION_ELSE_SPAN_NAME,
481 "otel.kind" = "INTERNAL"
482 ))
483 .await;
484 value.type_aware_deep_merge(v, parameters.schema);
485 errors.extend(err.into_iter());
486 } else if current_dir.is_empty() {
487 value.deep_merge(Value::Object(Default::default()));
490 }
491 }
492 .instrument(tracing::info_span!(
493 CONDITION_SPAN_NAME,
494 "graphql.condition" = condition,
495 "otel.kind" = "INTERNAL"
496 ))
497 .await
498 }
499 }
500
501 (value, errors)
502 })
503 }
504}
505
506impl DeferredNode {
507 fn execute<'a>(
508 &self,
509 parameters: &'a ExecutionParameters<'a>,
510 parent_value: &Value,
511 sender: mpsc::Sender<Response>,
512 primary_sender: &broadcast::Sender<(Value, Vec<Error>)>,
513 deferred_fetches: &mut HashMap<String, broadcast::Sender<(Value, Vec<Error>)>>,
514 ) -> impl Future<Output = ()> + use<> {
515 let mut deferred_receivers = Vec::new();
516
517 for d in self.depends.iter() {
518 match deferred_fetches.get(&d.id) {
519 None => {
520 let (sender, receiver) = tokio::sync::broadcast::channel(1);
521 deferred_fetches.insert(d.id.clone(), sender.clone());
522 deferred_receivers.push(StreamExt::into_future(BroadcastStream::new(receiver)));
523 }
524 Some(sender) => {
525 let receiver = sender.subscribe();
526 deferred_receivers.push(StreamExt::into_future(BroadcastStream::new(receiver)));
527 }
528 }
529 }
530
531 let is_depends_empty = self.depends.is_empty();
537
538 let mut stream: stream::FuturesUnordered<_> = deferred_receivers.into_iter().collect();
539 let deferred_inner = self.node.clone();
541 let deferred_path = self.query_path.clone();
542 let label = self.label.as_ref().map(|l| l.to_string());
543 let tx = sender;
544 let sc = parameters.schema.clone();
545 let subgraph_schemas = parameters.subgraph_schemas.clone();
546 let orig = parameters.supergraph_request.clone();
547 let sf = parameters.service_factory.clone();
548 let root_node = parameters.root_node.clone();
549 let ctx = parameters.context.clone();
550 let query = parameters.query.clone();
551 let subscription_handle = parameters.subscription_handle.clone();
552 let subscription_config = parameters.subscription_config.clone();
553 let mut primary_receiver = primary_sender.subscribe();
554 let mut value = parent_value.clone();
555 let depends_json = serde_json::to_string(&self.depends).unwrap_or_default();
556 async move {
557 let mut errors = Vec::new();
558
559 if is_depends_empty {
560 let (primary_value, primary_errors) =
561 primary_receiver.recv().await.unwrap_or_default();
562 value.type_aware_deep_merge(primary_value, &sc);
563 errors.extend(primary_errors)
564 } else {
565 while let Some((v, _remaining)) = stream.next().await {
566 if let Some(Ok((deferred_value, err))) = v {
571 value.type_aware_deep_merge(deferred_value, &sc);
572 errors.extend(err.into_iter())
573 }
574 }
575 }
576
577 let deferred_fetches = HashMap::new();
578
579 if let Some(node) = deferred_inner {
580 let (mut v, err) = node
581 .execute_recursively(
582 &ExecutionParameters {
583 context: &ctx,
584 service_factory: &sf,
585 schema: &sc,
586 supergraph_request: &orig,
587 deferred_fetches: &deferred_fetches,
588 query: &query,
589 root_node: &root_node,
590 subscription_handle: &subscription_handle,
591 subscription_config: &subscription_config,
592 subgraph_schemas: &subgraph_schemas,
593 },
594 &Path::default(),
595 &value,
596 tx.clone(),
597 )
598 .instrument(tracing::info_span!(
599 DEFER_DEFERRED_SPAN_NAME,
600 "graphql.label" = label,
601 "graphql.depends" = depends_json,
602 "graphql.path" = deferred_path.to_string(),
603 "otel.kind" = "INTERNAL"
604 ))
605 .await;
606
607 if !is_depends_empty {
608 let (primary_value, primary_errors) =
609 primary_receiver.recv().await.unwrap_or_default();
610 v.type_aware_deep_merge(primary_value, &sc);
611 errors.extend(primary_errors)
612 }
613
614 if let Err(e) = tx
615 .send(
616 Response::builder()
617 .data(v)
618 .errors(err)
619 .and_path(Some(deferred_path.clone()))
620 .and_label(label)
621 .build(),
622 )
623 .await
624 {
625 tracing::error!(
626 "error sending deferred response at path {}: {:?}",
627 deferred_path,
628 e
629 );
630 };
631 drop(tx);
632 } else {
633 let (primary_value, primary_errors) =
634 primary_receiver.recv().await.unwrap_or_default();
635 value.type_aware_deep_merge(primary_value, &sc);
636 errors.extend(primary_errors);
637
638 if let Err(e) = tx
639 .send(
640 Response::builder()
641 .data(value)
642 .errors(errors)
643 .and_path(Some(deferred_path.clone()))
644 .and_label(label)
645 .build(),
646 )
647 .await
648 {
649 tracing::error!(
650 "error sending deferred response at path {}: {:?}",
651 deferred_path,
652 e
653 );
654 }
655 drop(tx);
656 };
657 }
658 }
659}