Skip to main content

apollo_router/query_planner/
execution.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use futures::future::join_all;
5use futures::prelude::*;
6use tokio::sync::broadcast;
7use tokio::sync::mpsc;
8use tokio_stream::wrappers::BroadcastStream;
9use tracing::Instrument;
10
11use super::DeferredNode;
12use super::PlanNode;
13use super::QueryPlan;
14use super::log;
15use super::subscription::SubscriptionHandle;
16use crate::Context;
17use crate::axum_factory::CanceledRequest;
18use crate::error::Error;
19use crate::graphql::Request;
20use crate::graphql::Response;
21use crate::json_ext::Object;
22use crate::json_ext::Path;
23use crate::json_ext::Value;
24use crate::json_ext::ValueExt;
25use crate::plugins::subscription::SubscriptionConfig;
26use crate::query_planner::CONDITION_ELSE_SPAN_NAME;
27use crate::query_planner::CONDITION_IF_SPAN_NAME;
28use crate::query_planner::CONDITION_SPAN_NAME;
29use crate::query_planner::DEFER_DEFERRED_SPAN_NAME;
30use crate::query_planner::DEFER_PRIMARY_SPAN_NAME;
31use crate::query_planner::DEFER_SPAN_NAME;
32use crate::query_planner::FETCH_SPAN_NAME;
33use crate::query_planner::FLATTEN_SPAN_NAME;
34use crate::query_planner::FlattenNode;
35use crate::query_planner::PARALLEL_SPAN_NAME;
36use crate::query_planner::Primary;
37use crate::query_planner::SEQUENCE_SPAN_NAME;
38use crate::query_planner::SUBSCRIBE_SPAN_NAME;
39use crate::query_planner::fetch::SubgraphSchemas;
40use crate::services::SubgraphServiceFactory;
41use crate::spec::Query;
42use crate::spec::Schema;
43
44impl QueryPlan {
45    #[allow(clippy::too_many_arguments)]
46    /// Execute the plan and return a [`Response`].
47    pub(crate) async fn execute<'a>(
48        &self,
49        context: &'a Context,
50        service_factory: &'a Arc<SubgraphServiceFactory>,
51        supergraph_request: &'a Arc<http::Request<Request>>,
52        schema: &'a Arc<Schema>,
53        subgraph_schemas: &'a Arc<SubgraphSchemas>,
54        sender: mpsc::Sender<Response>,
55        subscription_handle: Option<SubscriptionHandle>,
56        subscription_config: &'a Option<SubscriptionConfig>,
57        initial_value: Option<Value>,
58    ) -> Response {
59        let root = Path::empty();
60
61        log::trace_query_plan(&self.root);
62        let deferred_fetches = HashMap::new();
63
64        let (value, errors) = self
65            .root
66            .execute_recursively(
67                &ExecutionParameters {
68                    context,
69                    service_factory,
70                    schema,
71                    supergraph_request,
72                    deferred_fetches: &deferred_fetches,
73                    query: &self.query,
74                    root_node: &self.root,
75                    subscription_handle: &subscription_handle,
76                    subscription_config,
77                    subgraph_schemas,
78                },
79                &root,
80                &initial_value.unwrap_or_default(),
81                sender,
82            )
83            .await;
84        if !deferred_fetches.is_empty() {
85            u64_counter!(
86                "apollo.router.operations.defer",
87                "Number of requests that request deferred data",
88                1
89            );
90        }
91
92        Response::builder().data(value).errors(errors).build()
93    }
94
95    pub fn contains_mutations(&self) -> bool {
96        self.root.contains_mutations()
97    }
98
99    pub fn subgraph_fetches(&self) -> usize {
100        self.root.subgraph_fetches()
101    }
102}
103
104// holds the query plan executon arguments that do not change between calls
105pub(crate) struct ExecutionParameters<'a> {
106    pub(crate) context: &'a Context,
107    pub(crate) service_factory: &'a Arc<SubgraphServiceFactory>,
108    pub(crate) schema: &'a Arc<Schema>,
109    pub(crate) subgraph_schemas: &'a Arc<SubgraphSchemas>,
110    pub(crate) supergraph_request: &'a Arc<http::Request<Request>>,
111    pub(crate) deferred_fetches: &'a HashMap<String, broadcast::Sender<(Value, Vec<Error>)>>,
112    pub(crate) query: &'a Arc<Query>,
113    pub(crate) root_node: &'a PlanNode,
114    pub(crate) subscription_handle: &'a Option<SubscriptionHandle>,
115    pub(crate) subscription_config: &'a Option<SubscriptionConfig>,
116}
117
118impl PlanNode {
119    pub(super) fn execute_recursively<'a>(
120        &'a self,
121        parameters: &'a ExecutionParameters<'a>,
122        current_dir: &'a Path,
123        parent_value: &'a Value,
124        sender: mpsc::Sender<Response>,
125    ) -> future::BoxFuture<'a, (Value, Vec<Error>)> {
126        Box::pin(async move {
127            tracing::trace!("executing plan:\n{:#?}", self);
128            let mut value;
129            let mut errors;
130
131            match self {
132                PlanNode::Sequence { nodes } => {
133                    value = parent_value.clone();
134                    errors = Vec::new();
135                    async {
136                        for node in nodes {
137                            let (v, err) = node
138                                .execute_recursively(
139                                    parameters,
140                                    current_dir,
141                                    &value,
142                                    sender.clone(),
143                                )
144                                .in_current_span()
145                                .await;
146                            value.type_aware_deep_merge(v, parameters.schema);
147                            errors.extend(err.into_iter());
148                        }
149                    }
150                    .instrument(tracing::info_span!(
151                        SEQUENCE_SPAN_NAME,
152                        "otel.kind" = "INTERNAL"
153                    ))
154                    .await
155                }
156                PlanNode::Parallel { nodes } => {
157                    value = Value::default();
158                    errors = Vec::new();
159                    async {
160                        let mut stream: stream::FuturesUnordered<_> = nodes
161                            .iter()
162                            .map(|plan| {
163                                plan.execute_recursively(
164                                    parameters,
165                                    current_dir,
166                                    parent_value,
167                                    sender.clone(),
168                                )
169                                .in_current_span()
170                            })
171                            .collect();
172
173                        while let Some((v, err)) = stream.next().in_current_span().await {
174                            value.type_aware_deep_merge(v, parameters.schema);
175                            errors.extend(err.into_iter());
176                        }
177                    }
178                    .instrument(tracing::info_span!(
179                        PARALLEL_SPAN_NAME,
180                        "otel.kind" = "INTERNAL"
181                    ))
182                    .await
183                }
184                PlanNode::Flatten(FlattenNode { path, node }) => {
185                    // Note that the span must be `info` as we need to pick this up in apollo tracing
186                    let current_dir = current_dir.join(path.remove_empty_key_root());
187                    let (v, err) = node
188                        .execute_recursively(
189                            parameters,
190                            // this is the only command that actually changes the "current dir"
191                            &current_dir,
192                            parent_value,
193                            sender,
194                        )
195                        .instrument(tracing::info_span!(
196                            FLATTEN_SPAN_NAME,
197                            "graphql.path" = %current_dir,
198                            "otel.kind" = "INTERNAL"
199                        ))
200                        .await;
201                    value = v;
202                    errors = err;
203                }
204                PlanNode::Subscription { primary, .. } => {
205                    if parameters.subscription_handle.is_some() {
206                        let fetch_time_offset =
207                            parameters.context.created_at.elapsed().as_nanos() as i64;
208                        errors = primary
209                            .execute_recursively(parameters, current_dir, parent_value, sender)
210                            .instrument(tracing::info_span!(
211                                SUBSCRIBE_SPAN_NAME,
212                                "otel.kind" = "INTERNAL",
213                                "apollo.subgraph.name" = primary.service_name.as_ref(),
214                                "apollo_private.sent_time_offset" = fetch_time_offset
215                            ))
216                            .await;
217                    } else {
218                        tracing::error!("No subscription handle provided for a subscription");
219                        errors = vec![
220                            Error::builder()
221                                .message("no subscription handle provided for a subscription")
222                                .extension_code("NO_SUBSCRIPTION_HANDLE")
223                                .build(),
224                        ];
225                    };
226
227                    value = Value::default();
228                }
229                PlanNode::Fetch(fetch_node) => {
230                    let fetch_time_offset =
231                        parameters.context.created_at.elapsed().as_nanos() as i64;
232
233                    // The client closed the connection, we are still executing the request pipeline,
234                    // but we won't send unused trafic to subgraph
235                    if parameters
236                        .context
237                        .extensions()
238                        .with_lock(|lock| lock.get::<CanceledRequest>().is_some())
239                    {
240                        value = Value::Object(Object::default());
241                        errors = Vec::new();
242                    } else {
243                        let (v, e) = fetch_node
244                            .fetch_node(parameters, parent_value, current_dir)
245                            .instrument(tracing::info_span!(
246                                FETCH_SPAN_NAME,
247                                "otel.kind" = "INTERNAL",
248                                "apollo.subgraph.name" = fetch_node.service_name.as_ref(),
249                                "apollo_private.sent_time_offset" = fetch_time_offset
250                            ))
251                            .await;
252                        value = v;
253                        errors = e;
254                    }
255                }
256                PlanNode::Defer {
257                    primary: Primary { node, .. },
258                    deferred,
259                } => {
260                    value = parent_value.clone();
261                    errors = Vec::new();
262                    async {
263                        let mut deferred_fetches: HashMap<
264                            String,
265                            broadcast::Sender<(Value, Vec<Error>)>,
266                        > = HashMap::new();
267                        let mut futures = Vec::new();
268
269                        let (primary_sender, _) =
270                            tokio::sync::broadcast::channel::<(Value, Vec<Error>)>(1);
271
272                        for deferred_node in deferred {
273                            let fut = deferred_node
274                                .execute(
275                                    parameters,
276                                    parent_value,
277                                    sender.clone(),
278                                    &primary_sender,
279                                    &mut deferred_fetches,
280                                )
281                                .in_current_span();
282
283                            futures.push(fut);
284                        }
285
286                        tokio::task::spawn(async move {
287                            join_all(futures).await;
288                        });
289
290                        if let Some(node) = node {
291                            let (v, err) = node
292                                .execute_recursively(
293                                    &ExecutionParameters {
294                                        context: parameters.context,
295                                        service_factory: parameters.service_factory,
296                                        schema: parameters.schema,
297                                        supergraph_request: parameters.supergraph_request,
298                                        deferred_fetches: &deferred_fetches,
299                                        query: parameters.query,
300                                        root_node: parameters.root_node,
301                                        subscription_handle: parameters.subscription_handle,
302                                        subscription_config: parameters.subscription_config,
303                                        subgraph_schemas: parameters.subgraph_schemas,
304                                    },
305                                    current_dir,
306                                    &value,
307                                    sender,
308                                )
309                                .instrument(tracing::info_span!(
310                                    DEFER_PRIMARY_SPAN_NAME,
311                                    "otel.kind" = "INTERNAL"
312                                ))
313                                .await;
314                            value.type_aware_deep_merge(v, parameters.schema);
315                            errors.extend(err.into_iter());
316
317                            let _ = primary_sender.send((value.clone(), errors.clone()));
318                        } else {
319                            let _ = primary_sender.send((value.clone(), errors.clone()));
320                            // primary response should be an empty object
321                            value.deep_merge(Value::Object(Default::default()));
322                        }
323                    }
324                    .instrument(tracing::info_span!(
325                        DEFER_SPAN_NAME,
326                        "otel.kind" = "INTERNAL"
327                    ))
328                    .await
329                }
330                PlanNode::Condition {
331                    condition,
332                    if_clause,
333                    else_clause,
334                } => {
335                    value = Value::default();
336                    errors = Vec::new();
337
338                    async {
339                        let v = parameters
340                            .query
341                            .variable_value(
342                                condition.as_str(),
343                                &parameters.supergraph_request.body().variables,
344                            )
345                            .unwrap_or(&Value::Bool(true)); // the defer if clause is mandatory, and defaults to true
346
347                        if let &Value::Bool(true) = v {
348                            //FIXME: should we show an error if the if_node was not present?
349                            if let Some(node) = if_clause {
350                                let (v, err) = node
351                                    .execute_recursively(
352                                        parameters,
353                                        current_dir,
354                                        parent_value,
355                                        sender.clone(),
356                                    )
357                                    .instrument(tracing::info_span!(
358                                        CONDITION_IF_SPAN_NAME,
359                                        "otel.kind" = "INTERNAL"
360                                    ))
361                                    .await;
362                                value.type_aware_deep_merge(v, parameters.schema);
363                                errors.extend(err.into_iter());
364                            } else if current_dir.is_empty() {
365                                // If the condition is on the root selection set and it's the only one
366                                // For queries like {get @skip(if: true) {id name}}
367                                value.deep_merge(Value::Object(Default::default()));
368                            }
369                        } else if let Some(node) = else_clause {
370                            let (v, err) = node
371                                .execute_recursively(
372                                    parameters,
373                                    current_dir,
374                                    parent_value,
375                                    sender.clone(),
376                                )
377                                .instrument(tracing::info_span!(
378                                    CONDITION_ELSE_SPAN_NAME,
379                                    "otel.kind" = "INTERNAL"
380                                ))
381                                .await;
382                            value.type_aware_deep_merge(v, parameters.schema);
383                            errors.extend(err.into_iter());
384                        } else if current_dir.is_empty() {
385                            // If the condition is on the root selection set and it's the only one
386                            // For queries like {get @include(if: false) {id name}}
387                            value.deep_merge(Value::Object(Default::default()));
388                        }
389                    }
390                    .instrument(tracing::info_span!(
391                        CONDITION_SPAN_NAME,
392                        "graphql.condition" = condition,
393                        "otel.kind" = "INTERNAL"
394                    ))
395                    .await
396                }
397            }
398
399            (value, errors)
400        })
401    }
402}
403
404impl DeferredNode {
405    fn execute<'a>(
406        &self,
407        parameters: &'a ExecutionParameters<'a>,
408        parent_value: &Value,
409        sender: mpsc::Sender<Response>,
410        primary_sender: &broadcast::Sender<(Value, Vec<Error>)>,
411        deferred_fetches: &mut HashMap<String, broadcast::Sender<(Value, Vec<Error>)>>,
412    ) -> impl Future<Output = ()> {
413        let mut deferred_receivers = Vec::new();
414
415        for d in self.depends.iter() {
416            match deferred_fetches.get(&d.id) {
417                None => {
418                    let (sender, receiver) = tokio::sync::broadcast::channel(1);
419                    deferred_fetches.insert(d.id.clone(), sender.clone());
420                    deferred_receivers.push(BroadcastStream::new(receiver).into_future());
421                }
422                Some(sender) => {
423                    let receiver = sender.subscribe();
424                    deferred_receivers.push(BroadcastStream::new(receiver).into_future());
425                }
426            }
427        }
428
429        // if a deferred node has no depends (ie not waiting for data from fetches) then it has to
430        // wait until the primary response is entirely created.
431        //
432        // If the depends list is not empty, the inner node can start working on the fetched data, then
433        // it is merged into the primary response before applying the subselection
434        let is_depends_empty = self.depends.is_empty();
435
436        let mut stream: stream::FuturesUnordered<_> = deferred_receivers.into_iter().collect();
437        //FIXME/ is there a solution without cloning the entire node? Maybe it could be moved instead?
438        let deferred_inner = self.node.clone();
439        let deferred_path = self.query_path.clone();
440        let label = self.label.as_ref().map(|l| l.to_string());
441        let tx = sender;
442        let sc = parameters.schema.clone();
443        let subgraph_schemas = parameters.subgraph_schemas.clone();
444        let orig = parameters.supergraph_request.clone();
445        let sf = parameters.service_factory.clone();
446        let root_node = parameters.root_node.clone();
447        let ctx = parameters.context.clone();
448        let query = parameters.query.clone();
449        let subscription_handle = parameters.subscription_handle.clone();
450        let subscription_config = parameters.subscription_config.clone();
451        let mut primary_receiver = primary_sender.subscribe();
452        let mut value = parent_value.clone();
453        let depends_json = serde_json::to_string(&self.depends).unwrap_or_default();
454        async move {
455            let mut errors = Vec::new();
456
457            if is_depends_empty {
458                let (primary_value, primary_errors) =
459                    primary_receiver.recv().await.unwrap_or_default();
460                value.type_aware_deep_merge(primary_value, &sc);
461                errors.extend(primary_errors)
462            } else {
463                while let Some((v, _remaining)) = stream.next().await {
464                    // a Err(RecvError) means either that the fetch was not performed and the
465                    // sender was dropped, possibly because there was no need to do it,
466                    // or because it is lagging, but here we only send one message so it
467                    // will not happen
468                    if let Some(Ok((deferred_value, err))) = v {
469                        value.type_aware_deep_merge(deferred_value, &sc);
470                        errors.extend(err.into_iter())
471                    }
472                }
473            }
474
475            let deferred_fetches = HashMap::new();
476
477            if let Some(node) = deferred_inner {
478                let (mut v, err) = node
479                    .execute_recursively(
480                        &ExecutionParameters {
481                            context: &ctx,
482                            service_factory: &sf,
483                            schema: &sc,
484                            supergraph_request: &orig,
485                            deferred_fetches: &deferred_fetches,
486                            query: &query,
487                            root_node: &root_node,
488                            subscription_handle: &subscription_handle,
489                            subscription_config: &subscription_config,
490                            subgraph_schemas: &subgraph_schemas,
491                        },
492                        &Path::default(),
493                        &value,
494                        tx.clone(),
495                    )
496                    .instrument(tracing::info_span!(
497                        DEFER_DEFERRED_SPAN_NAME,
498                        "graphql.label" = label,
499                        "graphql.depends" = depends_json,
500                        "graphql.path" = deferred_path.to_string(),
501                        "otel.kind" = "INTERNAL"
502                    ))
503                    .await;
504
505                if !is_depends_empty {
506                    let (primary_value, primary_errors) =
507                        primary_receiver.recv().await.unwrap_or_default();
508                    v.type_aware_deep_merge(primary_value, &sc);
509                    errors.extend(primary_errors)
510                }
511
512                if let Err(e) = tx
513                    .send(
514                        Response::builder()
515                            .data(v)
516                            .errors(err)
517                            .and_path(Some(deferred_path.clone()))
518                            .and_label(label)
519                            .build(),
520                    )
521                    .await
522                {
523                    tracing::error!(
524                        "error sending deferred response at path {}: {:?}",
525                        deferred_path,
526                        e
527                    );
528                };
529                drop(tx);
530            } else {
531                let (primary_value, primary_errors) =
532                    primary_receiver.recv().await.unwrap_or_default();
533                value.type_aware_deep_merge(primary_value, &sc);
534                errors.extend(primary_errors);
535
536                if let Err(e) = tx
537                    .send(
538                        Response::builder()
539                            .data(value)
540                            .errors(errors)
541                            .and_path(Some(deferred_path.clone()))
542                            .and_label(label)
543                            .build(),
544                    )
545                    .await
546                {
547                    tracing::error!(
548                        "error sending deferred response at path {}: {:?}",
549                        deferred_path,
550                        e
551                    );
552                }
553                drop(tx);
554            };
555        }
556    }
557}