apollo_router/query_planner/
plan.rs

1use std::sync::Arc;
2use std::sync::atomic::AtomicUsize;
3use std::sync::atomic::Ordering;
4
5use apollo_compiler::collections::HashSet;
6use apollo_compiler::validation::Valid;
7use serde::Deserialize;
8use serde::Serialize;
9
10pub(crate) use self::fetch::OperationKind;
11use super::fetch;
12use super::subscription::SubscriptionNode;
13use crate::apollo_studio_interop::UsageReporting;
14use crate::cache::estimate_size;
15use crate::configuration::Batching;
16use crate::error::CacheResolverError;
17use crate::error::ValidationErrors;
18use crate::json_ext::Object;
19use crate::json_ext::Path;
20use crate::json_ext::Value;
21use crate::plugins::authorization::CacheKeyMetadata;
22use crate::query_planner::fetch::SubgraphSchemas;
23use crate::services::query_planner::PlanOptions;
24use crate::spec::Query;
25use crate::spec::QueryHash;
26use crate::spec::operation_limits::OperationLimits;
27
28/// A planner key.
29///
30/// This type contains everything needed to separate query plan cache entries
31#[derive(Clone)]
32pub(crate) struct QueryKey {
33    pub(crate) filtered_query: String,
34    pub(crate) original_query: String,
35    pub(crate) operation_name: Option<String>,
36    pub(crate) metadata: CacheKeyMetadata,
37    pub(crate) plan_options: PlanOptions,
38}
39
40/// A plan for a given GraphQL query
41#[derive(Clone, Debug, Serialize, Deserialize)]
42pub struct QueryPlan {
43    pub(crate) usage_reporting: Arc<UsageReporting>,
44    pub(crate) root: Arc<PlanNode>,
45    /// String representation of the query plan (not a json representation)
46    pub(crate) formatted_query_plan: Option<Arc<String>>,
47    pub(crate) query: Arc<Query>,
48    pub(crate) query_metrics: OperationLimits<u32>,
49
50    /// The estimated size in bytes of the query plan
51    #[serde(default)]
52    pub(crate) estimated_size: Arc<AtomicUsize>,
53}
54
55/// This default impl is useful for test users
56/// who will need `QueryPlan`s to work with the `QueryPlannerService` and the `ExecutionService`
57#[buildstructor::buildstructor]
58impl QueryPlan {
59    #[builder]
60    pub(crate) fn fake_new(
61        root: Option<PlanNode>,
62        usage_reporting: Option<UsageReporting>,
63    ) -> Self {
64        Self {
65            usage_reporting: usage_reporting
66                .unwrap_or_else(|| UsageReporting::Error("this is a test report key".to_string()))
67                .into(),
68            root: Arc::new(root.unwrap_or_else(|| PlanNode::Sequence { nodes: Vec::new() })),
69            formatted_query_plan: Default::default(),
70            query: Arc::new(Query::empty_for_tests()),
71            query_metrics: Default::default(),
72            estimated_size: Default::default(),
73        }
74    }
75}
76
77impl QueryPlan {
78    pub(crate) fn is_deferred(&self, variables: &Object) -> bool {
79        self.root.is_deferred(variables, &self.query)
80    }
81
82    pub(crate) fn is_subscription(&self) -> bool {
83        matches!(self.query.operation.kind(), OperationKind::Subscription)
84    }
85
86    pub(crate) fn query_hashes(
87        &self,
88        batching_config: Batching,
89        variables: &Object,
90    ) -> Result<Vec<Arc<QueryHash>>, CacheResolverError> {
91        self.root
92            .query_hashes(batching_config, variables, &self.query)
93    }
94
95    pub(crate) fn estimated_size(&self) -> usize {
96        if self.estimated_size.load(Ordering::SeqCst) == 0 {
97            self.estimated_size
98                .store(estimate_size(self), Ordering::SeqCst);
99        }
100        self.estimated_size.load(Ordering::SeqCst)
101    }
102}
103
104/// Query plans are composed of a set of nodes.
105#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
106#[serde(rename_all = "PascalCase", tag = "kind")]
107pub(crate) enum PlanNode {
108    /// These nodes must be executed in order.
109    Sequence {
110        /// The plan nodes that make up the sequence execution.
111        nodes: Vec<PlanNode>,
112    },
113
114    /// These nodes may be executed in parallel.
115    Parallel {
116        /// The plan nodes that make up the parallel execution.
117        nodes: Vec<PlanNode>,
118    },
119
120    /// Fetch some data from a subgraph.
121    Fetch(fetch::FetchNode),
122
123    /// Merge the current resultset with the response.
124    Flatten(FlattenNode),
125
126    Defer {
127        primary: Primary,
128        deferred: Vec<DeferredNode>,
129    },
130
131    Subscription {
132        primary: SubscriptionNode,
133        rest: Option<Box<PlanNode>>,
134    },
135
136    #[serde(rename_all = "camelCase")]
137    Condition {
138        condition: String,
139        if_clause: Option<Box<PlanNode>>,
140        else_clause: Option<Box<PlanNode>>,
141    },
142}
143
144impl PlanNode {
145    pub(crate) fn contains_mutations(&self) -> bool {
146        match self {
147            Self::Sequence { nodes } => nodes.iter().any(|n| n.contains_mutations()),
148            Self::Parallel { nodes } => nodes.iter().any(|n| n.contains_mutations()),
149            Self::Fetch(fetch_node) => fetch_node.operation_kind() == &OperationKind::Mutation,
150            Self::Defer { primary, .. } => primary
151                .node
152                .as_ref()
153                .map(|n| n.contains_mutations())
154                .unwrap_or(false),
155            Self::Subscription { .. } => false,
156            Self::Flatten(_) => false,
157            Self::Condition {
158                if_clause,
159                else_clause,
160                ..
161            } => {
162                if let Some(node) = if_clause
163                    && node.contains_mutations()
164                {
165                    return true;
166                }
167                if let Some(node) = else_clause
168                    && node.contains_mutations()
169                {
170                    return true;
171                }
172                false
173            }
174        }
175    }
176
177    pub(crate) fn is_deferred(&self, variables: &Object, query: &Query) -> bool {
178        match self {
179            Self::Sequence { nodes } => nodes.iter().any(|n| n.is_deferred(variables, query)),
180            Self::Parallel { nodes } => nodes.iter().any(|n| n.is_deferred(variables, query)),
181            Self::Flatten(node) => node.node.is_deferred(variables, query),
182            Self::Fetch(..) => false,
183            Self::Defer { .. } => true,
184            Self::Subscription { .. } => false,
185            Self::Condition {
186                if_clause,
187                else_clause,
188                condition,
189            } => {
190                if query
191                    .variable_value(condition.as_str(), variables)
192                    .map(|v| *v == Value::Bool(true))
193                    .unwrap_or(true)
194                {
195                    // right now ConditionNode is only used with defer, but it might be used
196                    // in the future to implement @skip and @include execution
197                    if let Some(node) = if_clause
198                        && node.is_deferred(variables, query)
199                    {
200                        return true;
201                    }
202                } else if let Some(node) = else_clause
203                    && node.is_deferred(variables, query)
204                {
205                    return true;
206                }
207
208                false
209            }
210        }
211    }
212
213    /// Iteratively populate a Vec of QueryHashes representing Fetches in this plan.
214    ///
215    /// Do not include any operations which contain "requires" elements.
216    ///
217    /// This function is specifically designed to be used within the context of simple batching. It
218    /// explicitly fails if nodes which should *not* be encountered within that context are
219    /// encountered. e.g.: PlanNode::Defer
220    ///
221    /// It's unlikely/impossible that PlanNode::Defer or PlanNode::Subscription will ever be
222    /// supported, but it may be that PlanNode::Condition must eventually be supported (or other
223    /// new nodes types that are introduced). Explicitly fail each type to provide extra error
224    /// details and don't use _ so that future node types must be handled here.
225    pub(crate) fn query_hashes(
226        &self,
227        batching_config: Batching,
228        variables: &Object,
229        query: &Query,
230    ) -> Result<Vec<Arc<QueryHash>>, CacheResolverError> {
231        let mut query_hashes = vec![];
232        let mut new_targets = vec![self];
233
234        loop {
235            let targets = new_targets;
236            if targets.is_empty() {
237                break;
238            }
239
240            new_targets = vec![];
241            for target in targets {
242                match target {
243                    PlanNode::Sequence { nodes } | PlanNode::Parallel { nodes } => {
244                        new_targets.extend(nodes);
245                    }
246                    PlanNode::Fetch(node) => {
247                        // If requires.is_empty() we may be able to batch it!
248                        if node.requires.is_empty()
249                            && batching_config.batch_include(&node.service_name)
250                        {
251                            query_hashes.push(node.schema_aware_hash.clone());
252                        }
253                    }
254                    PlanNode::Flatten(node) => new_targets.push(&node.node),
255                    PlanNode::Defer { .. } => {
256                        return Err(CacheResolverError::BatchingError(
257                            "unexpected defer node encountered during query_hash processing"
258                                .to_string(),
259                        ));
260                    }
261                    PlanNode::Subscription { .. } => {
262                        return Err(CacheResolverError::BatchingError(
263                            "unexpected subscription node encountered during query_hash processing"
264                                .to_string(),
265                        ));
266                    }
267                    PlanNode::Condition {
268                        if_clause,
269                        else_clause,
270                        condition,
271                    } => {
272                        if query
273                            .variable_value(condition.as_str(), variables)
274                            .map(|v| *v == Value::Bool(true))
275                            .unwrap_or(true)
276                        {
277                            if let Some(node) = if_clause {
278                                new_targets.push(node);
279                            }
280                        } else if let Some(node) = else_clause {
281                            new_targets.push(node);
282                        }
283                    }
284                }
285            }
286        }
287        Ok(query_hashes)
288    }
289
290    pub(crate) fn subgraph_fetches(&self) -> usize {
291        match self {
292            PlanNode::Sequence { nodes } => nodes.iter().map(|n| n.subgraph_fetches()).sum(),
293            PlanNode::Parallel { nodes } => nodes.iter().map(|n| n.subgraph_fetches()).sum(),
294            PlanNode::Fetch(_) => 1,
295            PlanNode::Flatten(node) => node.node.subgraph_fetches(),
296            PlanNode::Defer { primary, deferred } => {
297                primary.node.as_ref().map_or(0, |n| n.subgraph_fetches())
298                    + deferred
299                        .iter()
300                        .map(|n| n.node.as_ref().map_or(0, |n| n.subgraph_fetches()))
301                        .sum::<usize>()
302            }
303            // A `SubscriptionNode` makes a request to a subgraph, so counting it as 1
304            PlanNode::Subscription { rest, .. } => {
305                rest.as_ref().map_or(0, |n| n.subgraph_fetches()) + 1
306            }
307            // Compute the highest possible value for condition nodes
308            PlanNode::Condition {
309                if_clause,
310                else_clause,
311                ..
312            } => std::cmp::max(
313                if_clause
314                    .as_ref()
315                    .map(|n| n.subgraph_fetches())
316                    .unwrap_or(0),
317                else_clause
318                    .as_ref()
319                    .map(|n| n.subgraph_fetches())
320                    .unwrap_or(0),
321            ),
322        }
323    }
324
325    pub(crate) fn init_parsed_operations(
326        &mut self,
327        subgraph_schemas: &SubgraphSchemas,
328    ) -> Result<(), ValidationErrors> {
329        match self {
330            PlanNode::Fetch(fetch_node) => {
331                fetch_node.init_parsed_operation(subgraph_schemas)?;
332            }
333
334            PlanNode::Sequence { nodes } => {
335                for node in nodes {
336                    node.init_parsed_operations(subgraph_schemas)?;
337                }
338            }
339            PlanNode::Parallel { nodes } => {
340                for node in nodes {
341                    node.init_parsed_operations(subgraph_schemas)?;
342                }
343            }
344            PlanNode::Flatten(flatten) => flatten.node.init_parsed_operations(subgraph_schemas)?,
345            PlanNode::Defer { primary, deferred } => {
346                if let Some(node) = primary.node.as_mut() {
347                    node.init_parsed_operations(subgraph_schemas)?;
348                }
349                for deferred_node in deferred {
350                    if let Some(node) = &mut deferred_node.node {
351                        Arc::make_mut(node).init_parsed_operations(subgraph_schemas)?;
352                    }
353                }
354            }
355            PlanNode::Subscription { primary, rest } => {
356                primary.init_parsed_operation(subgraph_schemas)?;
357                if let Some(node) = rest.as_mut() {
358                    node.init_parsed_operations(subgraph_schemas)?;
359                }
360            }
361            PlanNode::Condition {
362                condition: _,
363                if_clause,
364                else_clause,
365            } => {
366                if let Some(node) = if_clause.as_mut() {
367                    node.init_parsed_operations(subgraph_schemas)?;
368                }
369                if let Some(node) = else_clause.as_mut() {
370                    node.init_parsed_operations(subgraph_schemas)?;
371                }
372            }
373        }
374        Ok(())
375    }
376
377    pub(crate) fn init_parsed_operations_and_hash_subqueries(
378        &mut self,
379        subgraph_schemas: &SubgraphSchemas,
380    ) -> Result<(), ValidationErrors> {
381        match self {
382            PlanNode::Fetch(fetch_node) => {
383                fetch_node.init_parsed_operation_and_hash_subquery(subgraph_schemas)?;
384            }
385
386            PlanNode::Sequence { nodes } => {
387                for node in nodes {
388                    node.init_parsed_operations_and_hash_subqueries(subgraph_schemas)?;
389                }
390            }
391            PlanNode::Parallel { nodes } => {
392                for node in nodes {
393                    node.init_parsed_operations_and_hash_subqueries(subgraph_schemas)?;
394                }
395            }
396            PlanNode::Flatten(flatten) => flatten
397                .node
398                .init_parsed_operations_and_hash_subqueries(subgraph_schemas)?,
399            PlanNode::Defer { primary, deferred } => {
400                if let Some(node) = primary.node.as_mut() {
401                    node.init_parsed_operations_and_hash_subqueries(subgraph_schemas)?;
402                }
403                for deferred_node in deferred {
404                    if let Some(node) = &mut deferred_node.node {
405                        Arc::make_mut(node)
406                            .init_parsed_operations_and_hash_subqueries(subgraph_schemas)?
407                    }
408                }
409            }
410            PlanNode::Subscription { primary: _, rest } => {
411                if let Some(node) = rest.as_mut() {
412                    node.init_parsed_operations_and_hash_subqueries(subgraph_schemas)?;
413                }
414            }
415            PlanNode::Condition {
416                condition: _,
417                if_clause,
418                else_clause,
419            } => {
420                if let Some(node) = if_clause.as_mut() {
421                    node.init_parsed_operations_and_hash_subqueries(subgraph_schemas)?;
422                }
423                if let Some(node) = else_clause.as_mut() {
424                    node.init_parsed_operations_and_hash_subqueries(subgraph_schemas)?;
425                }
426            }
427        }
428        Ok(())
429    }
430
431    #[cfg(test)]
432    /// Retrieves all the services used across all plan nodes.
433    ///
434    /// Note that duplicates are not filtered.
435    pub(crate) fn service_usage<'a>(&'a self) -> Box<dyn Iterator<Item = &'a str> + 'a> {
436        match self {
437            Self::Sequence { nodes } | Self::Parallel { nodes } => {
438                Box::new(nodes.iter().flat_map(|x| x.service_usage()))
439            }
440            Self::Fetch(fetch) => Box::new(Some(fetch.service_name()).into_iter()),
441            Self::Subscription { primary, rest } => match rest {
442                Some(rest) => Box::new(
443                    rest.service_usage()
444                        .chain(Some(primary.service_name.as_ref())),
445                ) as Box<dyn Iterator<Item = &'a str> + 'a>,
446                None => Box::new(Some(primary.service_name.as_ref()).into_iter()),
447            },
448            Self::Flatten(flatten) => flatten.node.service_usage(),
449            Self::Defer { primary, deferred } => primary
450                .node
451                .as_ref()
452                .map(|n| {
453                    Box::new(
454                        n.service_usage().chain(
455                            deferred
456                                .iter()
457                                .flat_map(|d| d.node.iter().flat_map(|node| node.service_usage())),
458                        ),
459                    ) as Box<dyn Iterator<Item = &'a str> + 'a>
460                })
461                .unwrap_or_else(|| {
462                    Box::new(std::iter::empty()) as Box<dyn Iterator<Item = &'a str> + 'a>
463                }),
464
465            Self::Condition {
466                if_clause,
467                else_clause,
468                ..
469            } => match (if_clause, else_clause) {
470                (None, None) => Box::new(None.into_iter()),
471                (None, Some(node)) => node.service_usage(),
472                (Some(node), None) => node.service_usage(),
473                (Some(if_node), Some(else_node)) => {
474                    Box::new(if_node.service_usage().chain(else_node.service_usage()))
475                }
476            },
477        }
478    }
479
480    /// A version of `service_usage` that doesn't use recursion
481    /// and returns a `HashSet` instead of an `Iterator`.
482    pub(crate) fn service_usage_set(&self) -> HashSet<&str> {
483        let mut services = HashSet::default();
484        let mut stack = vec![self];
485        while let Some(node) = stack.pop() {
486            match node {
487                Self::Sequence { nodes } | Self::Parallel { nodes } => {
488                    stack.extend(nodes.iter());
489                }
490                Self::Fetch(fetch) => {
491                    services.insert(fetch.service_name.as_ref());
492                }
493                Self::Subscription { primary, rest } => {
494                    services.insert(primary.service_name.as_ref());
495                    if let Some(rest) = rest {
496                        stack.push(rest);
497                    }
498                }
499                Self::Flatten(flatten) => {
500                    stack.push(&flatten.node);
501                }
502                Self::Defer { primary, deferred } => {
503                    if let Some(primary) = primary.node.as_ref() {
504                        stack.push(primary);
505                    }
506                    stack.extend(deferred.iter().flat_map(|d| d.node.as_deref()));
507                }
508                Self::Condition {
509                    if_clause,
510                    else_clause,
511                    ..
512                } => {
513                    if let Some(if_clause) = if_clause {
514                        stack.push(if_clause);
515                    }
516                    if let Some(else_clause) = else_clause {
517                        stack.push(else_clause);
518                    }
519                }
520            }
521        }
522        services
523    }
524
525    pub(crate) fn extract_authorization_metadata(
526        &mut self,
527        schema: &Valid<apollo_compiler::Schema>,
528        key: &CacheKeyMetadata,
529    ) {
530        match self {
531            PlanNode::Fetch(fetch_node) => {
532                fetch_node.extract_authorization_metadata(schema, key);
533            }
534
535            PlanNode::Sequence { nodes } => {
536                for node in nodes {
537                    node.extract_authorization_metadata(schema, key);
538                }
539            }
540            PlanNode::Parallel { nodes } => {
541                for node in nodes {
542                    node.extract_authorization_metadata(schema, key);
543                }
544            }
545            PlanNode::Flatten(flatten) => flatten.node.extract_authorization_metadata(schema, key),
546            PlanNode::Defer { primary, deferred } => {
547                if let Some(node) = primary.node.as_mut() {
548                    node.extract_authorization_metadata(schema, key);
549                }
550                for deferred_node in deferred {
551                    if let Some(node) = deferred_node.node.take() {
552                        let mut new_node = (*node).clone();
553                        new_node.extract_authorization_metadata(schema, key);
554                        deferred_node.node = Some(Arc::new(new_node));
555                    }
556                }
557            }
558            PlanNode::Subscription { primary: _, rest } => {
559                if let Some(node) = rest.as_mut() {
560                    node.extract_authorization_metadata(schema, key);
561                }
562            }
563            PlanNode::Condition {
564                condition: _,
565                if_clause,
566                else_clause,
567            } => {
568                if let Some(node) = if_clause.as_mut() {
569                    node.extract_authorization_metadata(schema, key);
570                }
571                if let Some(node) = else_clause.as_mut() {
572                    node.extract_authorization_metadata(schema, key);
573                }
574            }
575        }
576    }
577}
578
579/// A flatten node.
580#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
581#[serde(rename_all = "camelCase")]
582pub(crate) struct FlattenNode {
583    /// The path when result should be merged.
584    pub(crate) path: Path,
585
586    /// The child execution plan.
587    pub(crate) node: Box<PlanNode>,
588}
589
590/// A primary query for a Defer node, the non deferred part
591#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
592#[serde(rename_all = "camelCase")]
593pub(crate) struct Primary {
594    /// The part of the original query that "selects" the data to
595    /// send in that primary response (once the plan in `node` completes).
596    pub(crate) subselection: Option<String>,
597
598    // The plan to get all the data for that primary part
599    pub(crate) node: Option<Box<PlanNode>>,
600}
601
602/// The "deferred" parts of the defer (note that it's an array). Each
603/// of those deferred elements will correspond to a different chunk of
604/// the response to the client (after the initial non-deferred one that is).
605#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
606#[serde(rename_all = "camelCase")]
607pub(crate) struct DeferredNode {
608    /// References one or more fetch node(s) (by `id`) within
609    /// `primary.node`. The plan of this deferred part should not
610    /// be started before all those fetches returns.
611    pub(crate) depends: Vec<Depends>,
612
613    /// The optional defer label.
614    pub(crate) label: Option<String>,
615    /// Path to the @defer this correspond to. `subselection` start at that `path`.
616    pub(crate) query_path: Path,
617    /// The part of the original query that "selects" the data to send
618    /// in that deferred response (once the plan in `node` completes).
619    /// Will be set _unless_ `node` is a `DeferNode` itself.
620    pub(crate) subselection: Option<String>,
621    /// The plan to get all the data for that deferred part
622    pub(crate) node: Option<Arc<PlanNode>>,
623}
624
625/// A deferred node.
626#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
627#[serde(rename_all = "camelCase")]
628pub(crate) struct Depends {
629    pub(crate) id: String,
630}
631
632#[cfg(test)]
633mod test {
634    use crate::query_planner::QueryPlan;
635
636    #[test]
637    fn test_estimated_size() {
638        let query_plan = QueryPlan::fake_builder().build();
639        let size1 = query_plan.estimated_size();
640        let size2 = query_plan.estimated_size();
641        assert!(size1 > 0);
642        assert_eq!(size1, size2);
643    }
644}