1use std::collections::HashMap;
2use std::sync::Arc;
3
4use futures::future::join_all;
5use futures::prelude::*;
6use tokio::sync::broadcast;
7use tokio::sync::mpsc;
8use tokio_stream::wrappers::BroadcastStream;
9use tracing::Instrument;
10
11use super::DeferredNode;
12use super::PlanNode;
13use super::QueryPlan;
14use super::log;
15use super::subscription::SubscriptionHandle;
16use crate::Context;
17use crate::axum_factory::CanceledRequest;
18use crate::error::Error;
19use crate::graphql::Request;
20use crate::graphql::Response;
21use crate::json_ext::Object;
22use crate::json_ext::Path;
23use crate::json_ext::Value;
24use crate::json_ext::ValueExt;
25use crate::plugins::subscription::SubscriptionConfig;
26use crate::query_planner::CONDITION_ELSE_SPAN_NAME;
27use crate::query_planner::CONDITION_IF_SPAN_NAME;
28use crate::query_planner::CONDITION_SPAN_NAME;
29use crate::query_planner::DEFER_DEFERRED_SPAN_NAME;
30use crate::query_planner::DEFER_PRIMARY_SPAN_NAME;
31use crate::query_planner::DEFER_SPAN_NAME;
32use crate::query_planner::FETCH_SPAN_NAME;
33use crate::query_planner::FLATTEN_SPAN_NAME;
34use crate::query_planner::FlattenNode;
35use crate::query_planner::PARALLEL_SPAN_NAME;
36use crate::query_planner::Primary;
37use crate::query_planner::SEQUENCE_SPAN_NAME;
38use crate::query_planner::SUBSCRIBE_SPAN_NAME;
39use crate::query_planner::fetch::SubgraphSchemas;
40use crate::services::SubgraphServiceFactory;
41use crate::spec::Query;
42use crate::spec::Schema;
43
44impl QueryPlan {
45 #[allow(clippy::too_many_arguments)]
46 pub(crate) async fn execute<'a>(
48 &self,
49 context: &'a Context,
50 service_factory: &'a Arc<SubgraphServiceFactory>,
51 supergraph_request: &'a Arc<http::Request<Request>>,
52 schema: &'a Arc<Schema>,
53 subgraph_schemas: &'a Arc<SubgraphSchemas>,
54 sender: mpsc::Sender<Response>,
55 subscription_handle: Option<SubscriptionHandle>,
56 subscription_config: &'a Option<SubscriptionConfig>,
57 initial_value: Option<Value>,
58 ) -> Response {
59 let root = Path::empty();
60
61 log::trace_query_plan(&self.root);
62 let deferred_fetches = HashMap::new();
63
64 let (value, errors) = self
65 .root
66 .execute_recursively(
67 &ExecutionParameters {
68 context,
69 service_factory,
70 schema,
71 supergraph_request,
72 deferred_fetches: &deferred_fetches,
73 query: &self.query,
74 root_node: &self.root,
75 subscription_handle: &subscription_handle,
76 subscription_config,
77 subgraph_schemas,
78 },
79 &root,
80 &initial_value.unwrap_or_default(),
81 sender,
82 )
83 .await;
84 if !deferred_fetches.is_empty() {
85 u64_counter!(
86 "apollo.router.operations.defer",
87 "Number of requests that request deferred data",
88 1
89 );
90 }
91
92 Response::builder().data(value).errors(errors).build()
93 }
94
95 pub fn contains_mutations(&self) -> bool {
96 self.root.contains_mutations()
97 }
98
99 pub fn subgraph_fetches(&self) -> usize {
100 self.root.subgraph_fetches()
101 }
102}
103
104pub(crate) struct ExecutionParameters<'a> {
106 pub(crate) context: &'a Context,
107 pub(crate) service_factory: &'a Arc<SubgraphServiceFactory>,
108 pub(crate) schema: &'a Arc<Schema>,
109 pub(crate) subgraph_schemas: &'a Arc<SubgraphSchemas>,
110 pub(crate) supergraph_request: &'a Arc<http::Request<Request>>,
111 pub(crate) deferred_fetches: &'a HashMap<String, broadcast::Sender<(Value, Vec<Error>)>>,
112 pub(crate) query: &'a Arc<Query>,
113 pub(crate) root_node: &'a PlanNode,
114 pub(crate) subscription_handle: &'a Option<SubscriptionHandle>,
115 pub(crate) subscription_config: &'a Option<SubscriptionConfig>,
116}
117
118impl PlanNode {
119 pub(super) fn execute_recursively<'a>(
120 &'a self,
121 parameters: &'a ExecutionParameters<'a>,
122 current_dir: &'a Path,
123 parent_value: &'a Value,
124 sender: mpsc::Sender<Response>,
125 ) -> future::BoxFuture<'a, (Value, Vec<Error>)> {
126 Box::pin(async move {
127 tracing::trace!("executing plan:\n{:#?}", self);
128 let mut value;
129 let mut errors;
130
131 match self {
132 PlanNode::Sequence { nodes } => {
133 value = parent_value.clone();
134 errors = Vec::new();
135 async {
136 for node in nodes {
137 let (v, err) = node
138 .execute_recursively(
139 parameters,
140 current_dir,
141 &value,
142 sender.clone(),
143 )
144 .in_current_span()
145 .await;
146 value.type_aware_deep_merge(v, parameters.schema);
147 errors.extend(err.into_iter());
148 }
149 }
150 .instrument(tracing::info_span!(
151 SEQUENCE_SPAN_NAME,
152 "otel.kind" = "INTERNAL"
153 ))
154 .await
155 }
156 PlanNode::Parallel { nodes } => {
157 value = Value::default();
158 errors = Vec::new();
159 async {
160 let mut stream: stream::FuturesUnordered<_> = nodes
161 .iter()
162 .map(|plan| {
163 plan.execute_recursively(
164 parameters,
165 current_dir,
166 parent_value,
167 sender.clone(),
168 )
169 .in_current_span()
170 })
171 .collect();
172
173 while let Some((v, err)) = stream.next().in_current_span().await {
174 value.type_aware_deep_merge(v, parameters.schema);
175 errors.extend(err.into_iter());
176 }
177 }
178 .instrument(tracing::info_span!(
179 PARALLEL_SPAN_NAME,
180 "otel.kind" = "INTERNAL"
181 ))
182 .await
183 }
184 PlanNode::Flatten(FlattenNode { path, node }) => {
185 let current_dir = current_dir.join(path.remove_empty_key_root());
187 let (v, err) = node
188 .execute_recursively(
189 parameters,
190 ¤t_dir,
192 parent_value,
193 sender,
194 )
195 .instrument(tracing::info_span!(
196 FLATTEN_SPAN_NAME,
197 "graphql.path" = %current_dir,
198 "otel.kind" = "INTERNAL"
199 ))
200 .await;
201 value = v;
202 errors = err;
203 }
204 PlanNode::Subscription { primary, .. } => {
205 if parameters.subscription_handle.is_some() {
206 let fetch_time_offset =
207 parameters.context.created_at.elapsed().as_nanos() as i64;
208 errors = primary
209 .execute_recursively(parameters, current_dir, parent_value, sender)
210 .instrument(tracing::info_span!(
211 SUBSCRIBE_SPAN_NAME,
212 "otel.kind" = "INTERNAL",
213 "apollo.subgraph.name" = primary.service_name.as_ref(),
214 "apollo_private.sent_time_offset" = fetch_time_offset
215 ))
216 .await;
217 } else {
218 tracing::error!("No subscription handle provided for a subscription");
219 errors = vec![
220 Error::builder()
221 .message("no subscription handle provided for a subscription")
222 .extension_code("NO_SUBSCRIPTION_HANDLE")
223 .build(),
224 ];
225 };
226
227 value = Value::default();
228 }
229 PlanNode::Fetch(fetch_node) => {
230 let fetch_time_offset =
231 parameters.context.created_at.elapsed().as_nanos() as i64;
232
233 if parameters
236 .context
237 .extensions()
238 .with_lock(|lock| lock.get::<CanceledRequest>().is_some())
239 {
240 value = Value::Object(Object::default());
241 errors = Vec::new();
242 } else {
243 let (v, e) = fetch_node
244 .fetch_node(parameters, parent_value, current_dir)
245 .instrument(tracing::info_span!(
246 FETCH_SPAN_NAME,
247 "otel.kind" = "INTERNAL",
248 "apollo.subgraph.name" = fetch_node.service_name.as_ref(),
249 "apollo_private.sent_time_offset" = fetch_time_offset
250 ))
251 .await;
252 value = v;
253 errors = e;
254 }
255 }
256 PlanNode::Defer {
257 primary: Primary { node, .. },
258 deferred,
259 } => {
260 value = parent_value.clone();
261 errors = Vec::new();
262 async {
263 let mut deferred_fetches: HashMap<
264 String,
265 broadcast::Sender<(Value, Vec<Error>)>,
266 > = HashMap::new();
267 let mut futures = Vec::new();
268
269 let (primary_sender, _) =
270 tokio::sync::broadcast::channel::<(Value, Vec<Error>)>(1);
271
272 for deferred_node in deferred {
273 let fut = deferred_node
274 .execute(
275 parameters,
276 parent_value,
277 sender.clone(),
278 &primary_sender,
279 &mut deferred_fetches,
280 )
281 .in_current_span();
282
283 futures.push(fut);
284 }
285
286 tokio::task::spawn(async move {
287 join_all(futures).await;
288 });
289
290 if let Some(node) = node {
291 let (v, err) = node
292 .execute_recursively(
293 &ExecutionParameters {
294 context: parameters.context,
295 service_factory: parameters.service_factory,
296 schema: parameters.schema,
297 supergraph_request: parameters.supergraph_request,
298 deferred_fetches: &deferred_fetches,
299 query: parameters.query,
300 root_node: parameters.root_node,
301 subscription_handle: parameters.subscription_handle,
302 subscription_config: parameters.subscription_config,
303 subgraph_schemas: parameters.subgraph_schemas,
304 },
305 current_dir,
306 &value,
307 sender,
308 )
309 .instrument(tracing::info_span!(
310 DEFER_PRIMARY_SPAN_NAME,
311 "otel.kind" = "INTERNAL"
312 ))
313 .await;
314 value.type_aware_deep_merge(v, parameters.schema);
315 errors.extend(err.into_iter());
316
317 let _ = primary_sender.send((value.clone(), errors.clone()));
318 } else {
319 let _ = primary_sender.send((value.clone(), errors.clone()));
320 value.deep_merge(Value::Object(Default::default()));
322 }
323 }
324 .instrument(tracing::info_span!(
325 DEFER_SPAN_NAME,
326 "otel.kind" = "INTERNAL"
327 ))
328 .await
329 }
330 PlanNode::Condition {
331 condition,
332 if_clause,
333 else_clause,
334 } => {
335 value = Value::default();
336 errors = Vec::new();
337
338 async {
339 let v = parameters
340 .query
341 .variable_value(
342 condition.as_str(),
343 ¶meters.supergraph_request.body().variables,
344 )
345 .unwrap_or(&Value::Bool(true)); if let &Value::Bool(true) = v {
348 if let Some(node) = if_clause {
350 let (v, err) = node
351 .execute_recursively(
352 parameters,
353 current_dir,
354 parent_value,
355 sender.clone(),
356 )
357 .instrument(tracing::info_span!(
358 CONDITION_IF_SPAN_NAME,
359 "otel.kind" = "INTERNAL"
360 ))
361 .await;
362 value.type_aware_deep_merge(v, parameters.schema);
363 errors.extend(err.into_iter());
364 } else if current_dir.is_empty() {
365 value.deep_merge(Value::Object(Default::default()));
368 }
369 } else if let Some(node) = else_clause {
370 let (v, err) = node
371 .execute_recursively(
372 parameters,
373 current_dir,
374 parent_value,
375 sender.clone(),
376 )
377 .instrument(tracing::info_span!(
378 CONDITION_ELSE_SPAN_NAME,
379 "otel.kind" = "INTERNAL"
380 ))
381 .await;
382 value.type_aware_deep_merge(v, parameters.schema);
383 errors.extend(err.into_iter());
384 } else if current_dir.is_empty() {
385 value.deep_merge(Value::Object(Default::default()));
388 }
389 }
390 .instrument(tracing::info_span!(
391 CONDITION_SPAN_NAME,
392 "graphql.condition" = condition,
393 "otel.kind" = "INTERNAL"
394 ))
395 .await
396 }
397 }
398
399 (value, errors)
400 })
401 }
402}
403
404impl DeferredNode {
405 fn execute<'a>(
406 &self,
407 parameters: &'a ExecutionParameters<'a>,
408 parent_value: &Value,
409 sender: mpsc::Sender<Response>,
410 primary_sender: &broadcast::Sender<(Value, Vec<Error>)>,
411 deferred_fetches: &mut HashMap<String, broadcast::Sender<(Value, Vec<Error>)>>,
412 ) -> impl Future<Output = ()> {
413 let mut deferred_receivers = Vec::new();
414
415 for d in self.depends.iter() {
416 match deferred_fetches.get(&d.id) {
417 None => {
418 let (sender, receiver) = tokio::sync::broadcast::channel(1);
419 deferred_fetches.insert(d.id.clone(), sender.clone());
420 deferred_receivers.push(BroadcastStream::new(receiver).into_future());
421 }
422 Some(sender) => {
423 let receiver = sender.subscribe();
424 deferred_receivers.push(BroadcastStream::new(receiver).into_future());
425 }
426 }
427 }
428
429 let is_depends_empty = self.depends.is_empty();
435
436 let mut stream: stream::FuturesUnordered<_> = deferred_receivers.into_iter().collect();
437 let deferred_inner = self.node.clone();
439 let deferred_path = self.query_path.clone();
440 let label = self.label.as_ref().map(|l| l.to_string());
441 let tx = sender;
442 let sc = parameters.schema.clone();
443 let subgraph_schemas = parameters.subgraph_schemas.clone();
444 let orig = parameters.supergraph_request.clone();
445 let sf = parameters.service_factory.clone();
446 let root_node = parameters.root_node.clone();
447 let ctx = parameters.context.clone();
448 let query = parameters.query.clone();
449 let subscription_handle = parameters.subscription_handle.clone();
450 let subscription_config = parameters.subscription_config.clone();
451 let mut primary_receiver = primary_sender.subscribe();
452 let mut value = parent_value.clone();
453 let depends_json = serde_json::to_string(&self.depends).unwrap_or_default();
454 async move {
455 let mut errors = Vec::new();
456
457 if is_depends_empty {
458 let (primary_value, primary_errors) =
459 primary_receiver.recv().await.unwrap_or_default();
460 value.type_aware_deep_merge(primary_value, &sc);
461 errors.extend(primary_errors)
462 } else {
463 while let Some((v, _remaining)) = stream.next().await {
464 if let Some(Ok((deferred_value, err))) = v {
469 value.type_aware_deep_merge(deferred_value, &sc);
470 errors.extend(err.into_iter())
471 }
472 }
473 }
474
475 let deferred_fetches = HashMap::new();
476
477 if let Some(node) = deferred_inner {
478 let (mut v, err) = node
479 .execute_recursively(
480 &ExecutionParameters {
481 context: &ctx,
482 service_factory: &sf,
483 schema: &sc,
484 supergraph_request: &orig,
485 deferred_fetches: &deferred_fetches,
486 query: &query,
487 root_node: &root_node,
488 subscription_handle: &subscription_handle,
489 subscription_config: &subscription_config,
490 subgraph_schemas: &subgraph_schemas,
491 },
492 &Path::default(),
493 &value,
494 tx.clone(),
495 )
496 .instrument(tracing::info_span!(
497 DEFER_DEFERRED_SPAN_NAME,
498 "graphql.label" = label,
499 "graphql.depends" = depends_json,
500 "graphql.path" = deferred_path.to_string(),
501 "otel.kind" = "INTERNAL"
502 ))
503 .await;
504
505 if !is_depends_empty {
506 let (primary_value, primary_errors) =
507 primary_receiver.recv().await.unwrap_or_default();
508 v.type_aware_deep_merge(primary_value, &sc);
509 errors.extend(primary_errors)
510 }
511
512 if let Err(e) = tx
513 .send(
514 Response::builder()
515 .data(v)
516 .errors(err)
517 .and_path(Some(deferred_path.clone()))
518 .and_label(label)
519 .build(),
520 )
521 .await
522 {
523 tracing::error!(
524 "error sending deferred response at path {}: {:?}",
525 deferred_path,
526 e
527 );
528 };
529 drop(tx);
530 } else {
531 let (primary_value, primary_errors) =
532 primary_receiver.recv().await.unwrap_or_default();
533 value.type_aware_deep_merge(primary_value, &sc);
534 errors.extend(primary_errors);
535
536 if let Err(e) = tx
537 .send(
538 Response::builder()
539 .data(value)
540 .errors(errors)
541 .and_path(Some(deferred_path.clone()))
542 .and_label(label)
543 .build(),
544 )
545 .await
546 {
547 tracing::error!(
548 "error sending deferred response at path {}: {:?}",
549 deferred_path,
550 e
551 );
552 }
553 drop(tx);
554 };
555 }
556 }
557}