apollo_router/query_planner/
execution.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use futures::StreamExt;
5use futures::future::join_all;
6use futures::prelude::*;
7use tokio::sync::broadcast;
8use tokio::sync::mpsc;
9use tokio_stream::wrappers::BroadcastStream;
10use tower::ServiceExt;
11use tracing::Instrument;
12
13use super::DeferredNode;
14use super::PlanNode;
15use super::QueryPlan;
16use super::log;
17use super::subscription::SubscriptionHandle;
18use crate::Context;
19use crate::axum_factory::CanceledRequest;
20use crate::error::Error;
21use crate::graphql::Request;
22use crate::graphql::Response;
23use crate::json_ext::Object;
24use crate::json_ext::Path;
25use crate::json_ext::PathElement;
26use crate::json_ext::Value;
27use crate::json_ext::ValueExt;
28use crate::plugins::subscription::SubscriptionConfig;
29use crate::query_planner::CONDITION_ELSE_SPAN_NAME;
30use crate::query_planner::CONDITION_IF_SPAN_NAME;
31use crate::query_planner::CONDITION_SPAN_NAME;
32use crate::query_planner::DEFER_DEFERRED_SPAN_NAME;
33use crate::query_planner::DEFER_PRIMARY_SPAN_NAME;
34use crate::query_planner::DEFER_SPAN_NAME;
35use crate::query_planner::FLATTEN_SPAN_NAME;
36use crate::query_planner::FlattenNode;
37use crate::query_planner::PARALLEL_SPAN_NAME;
38use crate::query_planner::Primary;
39use crate::query_planner::SEQUENCE_SPAN_NAME;
40use crate::query_planner::fetch::FetchNode;
41use crate::query_planner::fetch::SubgraphSchemas;
42use crate::query_planner::fetch::Variables;
43use crate::services::FetchRequest;
44use crate::services::fetch;
45use crate::services::fetch::ErrorMapping;
46use crate::services::fetch::SubscriptionRequest;
47use crate::services::fetch_service::FetchServiceFactory;
48use crate::services::new_service::ServiceFactory;
49use crate::spec::Query;
50use crate::spec::Schema;
51
52impl QueryPlan {
53    #[allow(clippy::too_many_arguments)]
54    /// Execute the plan and return a [`Response`].
55    pub(crate) async fn execute<'a>(
56        &self,
57        context: &'a Context,
58        service_factory: &'a Arc<FetchServiceFactory>,
59        // The original supergraph request is used to populate variable values and for plugin
60        // features like propagating headers or subgraph telemetry based on supergraph request
61        // values.
62        supergraph_request: &'a Arc<http::Request<Request>>,
63        schema: &'a Arc<Schema>,
64        subgraph_schemas: &'a Arc<SubgraphSchemas>,
65        // Sender for additional responses past the first one (@defer, @stream, subscriptions)
66        sender: mpsc::Sender<Response>,
67        subscription_handle: Option<SubscriptionHandle>,
68        subscription_config: &'a Option<SubscriptionConfig>,
69        // Query plan execution builds up a JSON result value, use this as the initial data.
70        initial_value: Option<Value>,
71    ) -> Response {
72        let root = Path::empty();
73
74        log::trace_query_plan(&self.root);
75        let deferred_fetches = HashMap::new();
76
77        let (value, errors) = self
78            .root
79            .execute_recursively(
80                &ExecutionParameters {
81                    context,
82                    service_factory,
83                    schema,
84                    supergraph_request,
85                    deferred_fetches: &deferred_fetches,
86                    query: &self.query,
87                    root_node: &self.root,
88                    subscription_handle: &subscription_handle,
89                    subscription_config,
90                    subgraph_schemas,
91                },
92                &root,
93                &initial_value.unwrap_or_default(),
94                sender,
95            )
96            .await;
97        if !deferred_fetches.is_empty() {
98            u64_counter!(
99                "apollo.router.operations.defer",
100                "Number of requests that request deferred data",
101                1
102            );
103        }
104
105        Response::builder().data(value).errors(errors).build()
106    }
107
108    pub fn contains_mutations(&self) -> bool {
109        self.root.contains_mutations()
110    }
111
112    pub fn subgraph_fetches(&self) -> usize {
113        self.root.subgraph_fetches()
114    }
115}
116
117// holds the query plan executon arguments that do not change between calls
118pub(crate) struct ExecutionParameters<'a> {
119    pub(crate) context: &'a Context,
120    pub(crate) service_factory: &'a Arc<FetchServiceFactory>,
121    pub(crate) schema: &'a Arc<Schema>,
122    pub(crate) subgraph_schemas: &'a Arc<SubgraphSchemas>,
123    pub(crate) supergraph_request: &'a Arc<http::Request<Request>>,
124    pub(crate) deferred_fetches: &'a HashMap<String, broadcast::Sender<(Value, Vec<Error>)>>,
125    pub(crate) query: &'a Arc<Query>,
126    pub(crate) root_node: &'a PlanNode,
127    pub(crate) subscription_handle: &'a Option<SubscriptionHandle>,
128    pub(crate) subscription_config: &'a Option<SubscriptionConfig>,
129}
130
131impl PlanNode {
132    pub(super) fn execute_recursively<'a>(
133        &'a self,
134        parameters: &'a ExecutionParameters<'a>,
135        current_dir: &'a Path,
136        parent_value: &'a Value,
137        sender: mpsc::Sender<Response>,
138    ) -> future::BoxFuture<'a, (Value, Vec<Error>)> {
139        Box::pin(async move {
140            tracing::trace!("executing plan:\n{:#?}", self);
141            let mut value;
142            let mut errors;
143
144            match self {
145                PlanNode::Sequence { nodes } => {
146                    value = parent_value.clone();
147                    errors = Vec::new();
148                    async {
149                        for node in nodes {
150                            let (v, err) = node
151                                .execute_recursively(
152                                    parameters,
153                                    current_dir,
154                                    &value,
155                                    sender.clone(),
156                                )
157                                .in_current_span()
158                                .await;
159                            value.type_aware_deep_merge(v, parameters.schema);
160                            errors.extend(err.into_iter());
161                        }
162                    }
163                    .instrument(tracing::info_span!(
164                        SEQUENCE_SPAN_NAME,
165                        "otel.kind" = "INTERNAL"
166                    ))
167                    .await
168                }
169                PlanNode::Parallel { nodes } => {
170                    value = Value::default();
171                    errors = Vec::new();
172                    async {
173                        let mut stream: stream::FuturesUnordered<_> = nodes
174                            .iter()
175                            .map(|plan| {
176                                plan.execute_recursively(
177                                    parameters,
178                                    current_dir,
179                                    parent_value,
180                                    sender.clone(),
181                                )
182                                .in_current_span()
183                            })
184                            .collect();
185
186                        while let Some((v, err)) = stream.next().in_current_span().await {
187                            value.type_aware_deep_merge(v, parameters.schema);
188                            errors.extend(err.into_iter());
189                        }
190                    }
191                    .instrument(tracing::info_span!(
192                        PARALLEL_SPAN_NAME,
193                        "otel.kind" = "INTERNAL"
194                    ))
195                    .await
196                }
197                PlanNode::Flatten(FlattenNode { path, node }) => {
198                    // Note that the span must be `info` as we need to pick this up in apollo tracing
199                    let current_dir = current_dir.join(path.remove_empty_key_root());
200                    let (v, err) = node
201                        .execute_recursively(
202                            parameters,
203                            // this is the only command that actually changes the "current dir"
204                            &current_dir,
205                            parent_value,
206                            sender,
207                        )
208                        .instrument(tracing::info_span!(
209                            FLATTEN_SPAN_NAME,
210                            "graphql.path" = %current_dir,
211                            "otel.kind" = "INTERNAL"
212                        ))
213                        .await;
214                    value = v;
215                    errors = err;
216                }
217                PlanNode::Subscription {
218                    primary: subscription_node,
219                    ..
220                } => {
221                    if parameters.subscription_handle.is_none() {
222                        tracing::error!("No subscription handle provided for a subscription");
223                        value = Value::default();
224                        errors = vec![
225                            Error::builder()
226                                .message("no subscription handle provided for a subscription")
227                                .extension_code("NO_SUBSCRIPTION_HANDLE")
228                                .build(),
229                        ];
230                    } else {
231                        match Variables::new(
232                            &[],
233                            &subscription_node.variable_usages,
234                            parent_value,
235                            current_dir,
236                            parameters.supergraph_request,
237                            parameters.schema,
238                            &subscription_node.input_rewrites,
239                            &None,
240                        ) {
241                            Some(variables) => {
242                                let service = parameters.service_factory.create();
243                                let request = fetch::Request::Subscription(
244                                    SubscriptionRequest::builder()
245                                        .context(parameters.context.clone())
246                                        .subscription_node(subscription_node.clone())
247                                        .supergraph_request(parameters.supergraph_request.clone())
248                                        .variables(variables)
249                                        .current_dir(current_dir.clone())
250                                        .sender(sender)
251                                        .and_subscription_handle(
252                                            parameters.subscription_handle.clone(),
253                                        )
254                                        .and_subscription_config(
255                                            parameters.subscription_config.clone(),
256                                        )
257                                        .build(),
258                                );
259                                (value, errors) =
260                                    match service.oneshot(request).await.map_to_graphql_error(
261                                        subscription_node.service_name.to_string(),
262                                        current_dir,
263                                    ) {
264                                        Ok(r) => r,
265                                        Err(e) => (Value::default(), vec![e]),
266                                    };
267                            }
268                            None => {
269                                value = Value::Object(Object::default());
270                                errors = Vec::new();
271                            }
272                        };
273                    }
274                }
275                PlanNode::Fetch(fetch_node) => {
276                    // The client closed the connection, we are still executing the request pipeline,
277                    // but we won't send unused trafic to subgraph
278                    if parameters
279                        .context
280                        .extensions()
281                        .with_lock(|lock| lock.get::<CanceledRequest>().is_some())
282                    {
283                        value = Value::Object(Object::default());
284                        errors = Vec::new();
285                    } else {
286                        match Variables::new(
287                            &fetch_node.requires,
288                            &fetch_node.variable_usages,
289                            parent_value,
290                            current_dir,
291                            parameters.supergraph_request,
292                            parameters.schema.as_ref(),
293                            &fetch_node.input_rewrites,
294                            &fetch_node.context_rewrites,
295                        ) {
296                            Some(variables) => {
297                                let paths = variables.inverted_paths.clone();
298                                let service = parameters.service_factory.create();
299                                let request = fetch::Request::Fetch(
300                                    FetchRequest::builder()
301                                        .context(parameters.context.clone())
302                                        .fetch_node(fetch_node.clone())
303                                        .supergraph_request(parameters.supergraph_request.clone())
304                                        .variables(variables)
305                                        .current_dir(current_dir.clone())
306                                        .build(),
307                                );
308                                let raw_errors;
309                                (value, raw_errors) =
310                                    match service.oneshot(request).await.map_to_graphql_error(
311                                        fetch_node.service_name.to_string(),
312                                        current_dir,
313                                    ) {
314                                        Ok(r) => r,
315                                        Err(e) => (Value::default(), vec![e]),
316                                    };
317
318                                // When a subgraph returns an unexpected response (ie not a body with
319                                // at least one of errors or data), the errors surfaced by the router
320                                // include an @ in the path. This indicates the error should be applied
321                                // to all elements in the array.
322                                errors = Vec::default();
323                                for err in raw_errors {
324                                    if let Some(err_path) = err.path.as_ref()
325                                        && err_path
326                                            .iter()
327                                            .any(|elem| matches!(elem, PathElement::Flatten(_)))
328                                    {
329                                        for path in paths.iter().flatten() {
330                                            if err_path.equal_if_flattened(path) {
331                                                let mut err = err.clone();
332                                                err.path = Some(path.clone());
333                                                errors.push(err);
334                                            }
335                                        }
336
337                                        continue;
338                                    }
339
340                                    errors.push(err);
341                                }
342
343                                FetchNode::deferred_fetches(
344                                    current_dir,
345                                    &fetch_node.id,
346                                    parameters.deferred_fetches,
347                                    &value,
348                                    &errors,
349                                );
350                            }
351                            None => {
352                                value = Value::Object(Object::default());
353                                errors = Vec::new();
354                            }
355                        };
356                    }
357                }
358                PlanNode::Defer {
359                    primary: Primary { node, .. },
360                    deferred,
361                } => {
362                    value = parent_value.clone();
363                    errors = Vec::new();
364                    async {
365                        let mut deferred_fetches: HashMap<
366                            String,
367                            broadcast::Sender<(Value, Vec<Error>)>,
368                        > = HashMap::new();
369                        let mut futures = Vec::new();
370
371                        let (primary_sender, _) =
372                            tokio::sync::broadcast::channel::<(Value, Vec<Error>)>(1);
373
374                        for deferred_node in deferred {
375                            let fut = deferred_node
376                                .execute(
377                                    parameters,
378                                    parent_value,
379                                    sender.clone(),
380                                    &primary_sender,
381                                    &mut deferred_fetches,
382                                )
383                                .in_current_span();
384
385                            futures.push(fut);
386                        }
387
388                        tokio::task::spawn(async move {
389                            join_all(futures).await;
390                        });
391
392                        if let Some(node) = node {
393                            let (v, err) = node
394                                .execute_recursively(
395                                    &ExecutionParameters {
396                                        context: parameters.context,
397                                        service_factory: parameters.service_factory,
398                                        schema: parameters.schema,
399                                        supergraph_request: parameters.supergraph_request,
400                                        deferred_fetches: &deferred_fetches,
401                                        query: parameters.query,
402                                        root_node: parameters.root_node,
403                                        subscription_handle: parameters.subscription_handle,
404                                        subscription_config: parameters.subscription_config,
405                                        subgraph_schemas: parameters.subgraph_schemas,
406                                    },
407                                    current_dir,
408                                    &value,
409                                    sender,
410                                )
411                                .instrument(tracing::info_span!(
412                                    DEFER_PRIMARY_SPAN_NAME,
413                                    "otel.kind" = "INTERNAL"
414                                ))
415                                .await;
416                            value.type_aware_deep_merge(v, parameters.schema);
417                            errors.extend(err.into_iter());
418
419                            let _ = primary_sender.send((value.clone(), errors.clone()));
420                        } else {
421                            let _ = primary_sender.send((value.clone(), errors.clone()));
422                            // primary response should be an empty object
423                            value.deep_merge(Value::Object(Default::default()));
424                        }
425                    }
426                    .instrument(tracing::info_span!(
427                        DEFER_SPAN_NAME,
428                        "otel.kind" = "INTERNAL"
429                    ))
430                    .await
431                }
432                PlanNode::Condition {
433                    condition,
434                    if_clause,
435                    else_clause,
436                } => {
437                    value = Value::default();
438                    errors = Vec::new();
439
440                    async {
441                        let v = parameters
442                            .query
443                            .variable_value(
444                                condition.as_str(),
445                                &parameters.supergraph_request.body().variables,
446                            )
447                            .unwrap_or(&Value::Bool(true)); // the defer if clause is mandatory, and defaults to true
448
449                        if let &Value::Bool(true) = v {
450                            //FIXME: should we show an error if the if_node was not present?
451                            if let Some(node) = if_clause {
452                                let (v, err) = node
453                                    .execute_recursively(
454                                        parameters,
455                                        current_dir,
456                                        parent_value,
457                                        sender.clone(),
458                                    )
459                                    .instrument(tracing::info_span!(
460                                        CONDITION_IF_SPAN_NAME,
461                                        "otel.kind" = "INTERNAL"
462                                    ))
463                                    .await;
464                                value.type_aware_deep_merge(v, parameters.schema);
465                                errors.extend(err.into_iter());
466                            } else if current_dir.is_empty() {
467                                // If the condition is on the root selection set and it's the only one
468                                // For queries like {get @skip(if: true) {id name}}
469                                value.deep_merge(Value::Object(Default::default()));
470                            }
471                        } else if let Some(node) = else_clause {
472                            let (v, err) = node
473                                .execute_recursively(
474                                    parameters,
475                                    current_dir,
476                                    parent_value,
477                                    sender.clone(),
478                                )
479                                .instrument(tracing::info_span!(
480                                    CONDITION_ELSE_SPAN_NAME,
481                                    "otel.kind" = "INTERNAL"
482                                ))
483                                .await;
484                            value.type_aware_deep_merge(v, parameters.schema);
485                            errors.extend(err.into_iter());
486                        } else if current_dir.is_empty() {
487                            // If the condition is on the root selection set and it's the only one
488                            // For queries like {get @include(if: false) {id name}}
489                            value.deep_merge(Value::Object(Default::default()));
490                        }
491                    }
492                    .instrument(tracing::info_span!(
493                        CONDITION_SPAN_NAME,
494                        "graphql.condition" = condition,
495                        "otel.kind" = "INTERNAL"
496                    ))
497                    .await
498                }
499            }
500
501            (value, errors)
502        })
503    }
504}
505
506impl DeferredNode {
507    fn execute<'a>(
508        &self,
509        parameters: &'a ExecutionParameters<'a>,
510        parent_value: &Value,
511        sender: mpsc::Sender<Response>,
512        primary_sender: &broadcast::Sender<(Value, Vec<Error>)>,
513        deferred_fetches: &mut HashMap<String, broadcast::Sender<(Value, Vec<Error>)>>,
514    ) -> impl Future<Output = ()> + use<> {
515        let mut deferred_receivers = Vec::new();
516
517        for d in self.depends.iter() {
518            match deferred_fetches.get(&d.id) {
519                None => {
520                    let (sender, receiver) = tokio::sync::broadcast::channel(1);
521                    deferred_fetches.insert(d.id.clone(), sender.clone());
522                    deferred_receivers.push(StreamExt::into_future(BroadcastStream::new(receiver)));
523                }
524                Some(sender) => {
525                    let receiver = sender.subscribe();
526                    deferred_receivers.push(StreamExt::into_future(BroadcastStream::new(receiver)));
527                }
528            }
529        }
530
531        // if a deferred node has no depends (ie not waiting for data from fetches) then it has to
532        // wait until the primary response is entirely created.
533        //
534        // If the depends list is not empty, the inner node can start working on the fetched data, then
535        // it is merged into the primary response before applying the subselection
536        let is_depends_empty = self.depends.is_empty();
537
538        let mut stream: stream::FuturesUnordered<_> = deferred_receivers.into_iter().collect();
539        //FIXME/ is there a solution without cloning the entire node? Maybe it could be moved instead?
540        let deferred_inner = self.node.clone();
541        let deferred_path = self.query_path.clone();
542        let label = self.label.as_ref().map(|l| l.to_string());
543        let tx = sender;
544        let sc = parameters.schema.clone();
545        let subgraph_schemas = parameters.subgraph_schemas.clone();
546        let orig = parameters.supergraph_request.clone();
547        let sf = parameters.service_factory.clone();
548        let root_node = parameters.root_node.clone();
549        let ctx = parameters.context.clone();
550        let query = parameters.query.clone();
551        let subscription_handle = parameters.subscription_handle.clone();
552        let subscription_config = parameters.subscription_config.clone();
553        let mut primary_receiver = primary_sender.subscribe();
554        let mut value = parent_value.clone();
555        let depends_json = serde_json::to_string(&self.depends).unwrap_or_default();
556        async move {
557            let mut errors = Vec::new();
558
559            if is_depends_empty {
560                let (primary_value, primary_errors) =
561                    primary_receiver.recv().await.unwrap_or_default();
562                value.type_aware_deep_merge(primary_value, &sc);
563                errors.extend(primary_errors)
564            } else {
565                while let Some((v, _remaining)) = stream.next().await {
566                    // a Err(RecvError) means either that the fetch was not performed and the
567                    // sender was dropped, possibly because there was no need to do it,
568                    // or because it is lagging, but here we only send one message so it
569                    // will not happen
570                    if let Some(Ok((deferred_value, err))) = v {
571                        value.type_aware_deep_merge(deferred_value, &sc);
572                        errors.extend(err.into_iter())
573                    }
574                }
575            }
576
577            let deferred_fetches = HashMap::new();
578
579            if let Some(node) = deferred_inner {
580                let (mut v, err) = node
581                    .execute_recursively(
582                        &ExecutionParameters {
583                            context: &ctx,
584                            service_factory: &sf,
585                            schema: &sc,
586                            supergraph_request: &orig,
587                            deferred_fetches: &deferred_fetches,
588                            query: &query,
589                            root_node: &root_node,
590                            subscription_handle: &subscription_handle,
591                            subscription_config: &subscription_config,
592                            subgraph_schemas: &subgraph_schemas,
593                        },
594                        &Path::default(),
595                        &value,
596                        tx.clone(),
597                    )
598                    .instrument(tracing::info_span!(
599                        DEFER_DEFERRED_SPAN_NAME,
600                        "graphql.label" = label,
601                        "graphql.depends" = depends_json,
602                        "graphql.path" = deferred_path.to_string(),
603                        "otel.kind" = "INTERNAL"
604                    ))
605                    .await;
606
607                if !is_depends_empty {
608                    let (primary_value, primary_errors) =
609                        primary_receiver.recv().await.unwrap_or_default();
610                    v.type_aware_deep_merge(primary_value, &sc);
611                    errors.extend(primary_errors)
612                }
613
614                if let Err(e) = tx
615                    .send(
616                        Response::builder()
617                            .data(v)
618                            .errors(err)
619                            .and_path(Some(deferred_path.clone()))
620                            .and_label(label)
621                            .build(),
622                    )
623                    .await
624                {
625                    tracing::error!(
626                        "error sending deferred response at path {}: {:?}",
627                        deferred_path,
628                        e
629                    );
630                };
631                drop(tx);
632            } else {
633                let (primary_value, primary_errors) =
634                    primary_receiver.recv().await.unwrap_or_default();
635                value.type_aware_deep_merge(primary_value, &sc);
636                errors.extend(primary_errors);
637
638                if let Err(e) = tx
639                    .send(
640                        Response::builder()
641                            .data(value)
642                            .errors(errors)
643                            .and_path(Some(deferred_path.clone()))
644                            .and_label(label)
645                            .build(),
646                    )
647                    .await
648                {
649                    tracing::error!(
650                        "error sending deferred response at path {}: {:?}",
651                        deferred_path,
652                        e
653                    );
654                }
655                drop(tx);
656            };
657        }
658    }
659}