Skip to main content

apollo_router/query_planner/
plan.rs

1use std::sync::Arc;
2use std::sync::atomic::AtomicUsize;
3use std::sync::atomic::Ordering;
4
5use apollo_compiler::validation::Valid;
6use serde::Deserialize;
7use serde::Serialize;
8
9pub(crate) use self::fetch::OperationKind;
10use super::fetch;
11use super::subscription::SubscriptionNode;
12use crate::apollo_studio_interop::UsageReporting;
13use crate::cache::estimate_size;
14use crate::configuration::Batching;
15use crate::error::CacheResolverError;
16use crate::error::ValidationErrors;
17use crate::json_ext::Object;
18use crate::json_ext::Path;
19use crate::json_ext::Value;
20use crate::plugins::authorization::CacheKeyMetadata;
21use crate::query_planner::fetch::SubgraphSchemas;
22use crate::services::query_planner::PlanOptions;
23use crate::spec::Query;
24use crate::spec::QueryHash;
25use crate::spec::operation_limits::OperationLimits;
26
27/// A planner key.
28///
29/// This type contains everything needed to separate query plan cache entries
30#[derive(Clone)]
31pub(crate) struct QueryKey {
32    pub(crate) filtered_query: String,
33    pub(crate) original_query: String,
34    pub(crate) operation_name: Option<String>,
35    pub(crate) metadata: CacheKeyMetadata,
36    pub(crate) plan_options: PlanOptions,
37}
38
39/// A plan for a given GraphQL query
40#[derive(Clone, Debug, Serialize, Deserialize)]
41pub struct QueryPlan {
42    pub(crate) usage_reporting: Arc<UsageReporting>,
43    pub(crate) root: Arc<PlanNode>,
44    /// String representation of the query plan (not a json representation)
45    pub(crate) formatted_query_plan: Option<Arc<String>>,
46    pub(crate) query: Arc<Query>,
47    pub(crate) query_metrics: OperationLimits<u32>,
48
49    /// The estimated size in bytes of the query plan
50    #[serde(default)]
51    pub(crate) estimated_size: Arc<AtomicUsize>,
52}
53
54/// This default impl is useful for test users
55/// who will need `QueryPlan`s to work with the `QueryPlannerService` and the `ExecutionService`
56#[buildstructor::buildstructor]
57impl QueryPlan {
58    #[builder]
59    pub(crate) fn fake_new(
60        root: Option<PlanNode>,
61        usage_reporting: Option<UsageReporting>,
62    ) -> Self {
63        Self {
64            usage_reporting: usage_reporting
65                .unwrap_or_else(|| UsageReporting {
66                    stats_report_key: "this is a test report key".to_string(),
67                    referenced_fields_by_type: Default::default(),
68                })
69                .into(),
70            root: Arc::new(root.unwrap_or_else(|| PlanNode::Sequence { nodes: Vec::new() })),
71            formatted_query_plan: Default::default(),
72            query: Arc::new(Query::empty_for_tests()),
73            query_metrics: Default::default(),
74            estimated_size: Default::default(),
75        }
76    }
77}
78
79impl QueryPlan {
80    pub(crate) fn is_deferred(&self, variables: &Object) -> bool {
81        self.root.is_deferred(variables, &self.query)
82    }
83
84    pub(crate) fn is_subscription(&self) -> bool {
85        matches!(self.query.operation.kind(), OperationKind::Subscription)
86    }
87
88    pub(crate) fn query_hashes(
89        &self,
90        batching_config: Batching,
91        variables: &Object,
92    ) -> Result<Vec<Arc<QueryHash>>, CacheResolverError> {
93        self.root
94            .query_hashes(batching_config, variables, &self.query)
95    }
96
97    pub(crate) fn estimated_size(&self) -> usize {
98        if self.estimated_size.load(Ordering::SeqCst) == 0 {
99            self.estimated_size
100                .store(estimate_size(self), Ordering::SeqCst);
101        }
102        self.estimated_size.load(Ordering::SeqCst)
103    }
104}
105
106/// Query plans are composed of a set of nodes.
107#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
108#[serde(rename_all = "PascalCase", tag = "kind")]
109pub(crate) enum PlanNode {
110    /// These nodes must be executed in order.
111    Sequence {
112        /// The plan nodes that make up the sequence execution.
113        nodes: Vec<PlanNode>,
114    },
115
116    /// These nodes may be executed in parallel.
117    Parallel {
118        /// The plan nodes that make up the parallel execution.
119        nodes: Vec<PlanNode>,
120    },
121
122    /// Fetch some data from a subgraph.
123    Fetch(fetch::FetchNode),
124
125    /// Merge the current resultset with the response.
126    Flatten(FlattenNode),
127
128    Defer {
129        primary: Primary,
130        deferred: Vec<DeferredNode>,
131    },
132
133    Subscription {
134        primary: SubscriptionNode,
135        rest: Option<Box<PlanNode>>,
136    },
137
138    #[serde(rename_all = "camelCase")]
139    Condition {
140        condition: String,
141        if_clause: Option<Box<PlanNode>>,
142        else_clause: Option<Box<PlanNode>>,
143    },
144}
145
146impl PlanNode {
147    pub(crate) fn contains_mutations(&self) -> bool {
148        match self {
149            Self::Sequence { nodes } => nodes.iter().any(|n| n.contains_mutations()),
150            Self::Parallel { nodes } => nodes.iter().any(|n| n.contains_mutations()),
151            Self::Fetch(fetch_node) => fetch_node.operation_kind() == &OperationKind::Mutation,
152            Self::Defer { primary, .. } => primary
153                .node
154                .as_ref()
155                .map(|n| n.contains_mutations())
156                .unwrap_or(false),
157            Self::Subscription { .. } => false,
158            Self::Flatten(_) => false,
159            Self::Condition {
160                if_clause,
161                else_clause,
162                ..
163            } => {
164                if let Some(node) = if_clause {
165                    if node.contains_mutations() {
166                        return true;
167                    }
168                }
169                if let Some(node) = else_clause {
170                    if node.contains_mutations() {
171                        return true;
172                    }
173                }
174                false
175            }
176        }
177    }
178
179    pub(crate) fn is_deferred(&self, variables: &Object, query: &Query) -> bool {
180        match self {
181            Self::Sequence { nodes } => nodes.iter().any(|n| n.is_deferred(variables, query)),
182            Self::Parallel { nodes } => nodes.iter().any(|n| n.is_deferred(variables, query)),
183            Self::Flatten(node) => node.node.is_deferred(variables, query),
184            Self::Fetch(..) => false,
185            Self::Defer { .. } => true,
186            Self::Subscription { .. } => false,
187            Self::Condition {
188                if_clause,
189                else_clause,
190                condition,
191            } => {
192                if query
193                    .variable_value(condition.as_str(), variables)
194                    .map(|v| *v == Value::Bool(true))
195                    .unwrap_or(true)
196                {
197                    // right now ConditionNode is only used with defer, but it might be used
198                    // in the future to implement @skip and @include execution
199                    if let Some(node) = if_clause {
200                        if node.is_deferred(variables, query) {
201                            return true;
202                        }
203                    }
204                } else if let Some(node) = else_clause {
205                    if node.is_deferred(variables, query) {
206                        return true;
207                    }
208                }
209
210                false
211            }
212        }
213    }
214
215    /// Iteratively populate a Vec of QueryHashes representing Fetches in this plan.
216    ///
217    /// Do not include any operations which contain "requires" elements.
218    ///
219    /// This function is specifically designed to be used within the context of simple batching. It
220    /// explicitly fails if nodes which should *not* be encountered within that context are
221    /// encountered. e.g.: PlanNode::Defer
222    ///
223    /// It's unlikely/impossible that PlanNode::Defer or PlanNode::Subscription will ever be
224    /// supported, but it may be that PlanNode::Condition must eventually be supported (or other
225    /// new nodes types that are introduced). Explicitly fail each type to provide extra error
226    /// details and don't use _ so that future node types must be handled here.
227    pub(crate) fn query_hashes(
228        &self,
229        batching_config: Batching,
230        variables: &Object,
231        query: &Query,
232    ) -> Result<Vec<Arc<QueryHash>>, CacheResolverError> {
233        let mut query_hashes = vec![];
234        let mut new_targets = vec![self];
235
236        loop {
237            let targets = new_targets;
238            if targets.is_empty() {
239                break;
240            }
241
242            new_targets = vec![];
243            for target in targets {
244                match target {
245                    PlanNode::Sequence { nodes } | PlanNode::Parallel { nodes } => {
246                        new_targets.extend(nodes);
247                    }
248                    PlanNode::Fetch(node) => {
249                        // If requires.is_empty() we may be able to batch it!
250                        if node.requires.is_empty()
251                            && batching_config.batch_include(&node.service_name)
252                        {
253                            query_hashes.push(node.schema_aware_hash.clone());
254                        }
255                    }
256                    PlanNode::Flatten(node) => new_targets.push(&node.node),
257                    PlanNode::Defer { .. } => {
258                        return Err(CacheResolverError::BatchingError(
259                            "unexpected defer node encountered during query_hash processing"
260                                .to_string(),
261                        ));
262                    }
263                    PlanNode::Subscription { .. } => {
264                        return Err(CacheResolverError::BatchingError(
265                            "unexpected subscription node encountered during query_hash processing"
266                                .to_string(),
267                        ));
268                    }
269                    PlanNode::Condition {
270                        if_clause,
271                        else_clause,
272                        condition,
273                    } => {
274                        if query
275                            .variable_value(condition.as_str(), variables)
276                            .map(|v| *v == Value::Bool(true))
277                            .unwrap_or(true)
278                        {
279                            if let Some(node) = if_clause {
280                                new_targets.push(node);
281                            }
282                        } else if let Some(node) = else_clause {
283                            new_targets.push(node);
284                        }
285                    }
286                }
287            }
288        }
289        Ok(query_hashes)
290    }
291
292    pub(crate) fn subgraph_fetches(&self) -> usize {
293        match self {
294            PlanNode::Sequence { nodes } => nodes.iter().map(|n| n.subgraph_fetches()).sum(),
295            PlanNode::Parallel { nodes } => nodes.iter().map(|n| n.subgraph_fetches()).sum(),
296            PlanNode::Fetch(_) => 1,
297            PlanNode::Flatten(node) => node.node.subgraph_fetches(),
298            PlanNode::Defer { primary, deferred } => {
299                primary.node.as_ref().map_or(0, |n| n.subgraph_fetches())
300                    + deferred
301                        .iter()
302                        .map(|n| n.node.as_ref().map_or(0, |n| n.subgraph_fetches()))
303                        .sum::<usize>()
304            }
305            // A `SubscriptionNode` makes a request to a subgraph, so counting it as 1
306            PlanNode::Subscription { rest, .. } => {
307                rest.as_ref().map_or(0, |n| n.subgraph_fetches()) + 1
308            }
309            // Compute the highest possible value for condition nodes
310            PlanNode::Condition {
311                if_clause,
312                else_clause,
313                ..
314            } => std::cmp::max(
315                if_clause
316                    .as_ref()
317                    .map(|n| n.subgraph_fetches())
318                    .unwrap_or(0),
319                else_clause
320                    .as_ref()
321                    .map(|n| n.subgraph_fetches())
322                    .unwrap_or(0),
323            ),
324        }
325    }
326
327    pub(crate) fn init_parsed_operations(
328        &mut self,
329        subgraph_schemas: &SubgraphSchemas,
330    ) -> Result<(), ValidationErrors> {
331        match self {
332            PlanNode::Fetch(fetch_node) => {
333                fetch_node.init_parsed_operation(subgraph_schemas)?;
334            }
335
336            PlanNode::Sequence { nodes } => {
337                for node in nodes {
338                    node.init_parsed_operations(subgraph_schemas)?;
339                }
340            }
341            PlanNode::Parallel { nodes } => {
342                for node in nodes {
343                    node.init_parsed_operations(subgraph_schemas)?;
344                }
345            }
346            PlanNode::Flatten(flatten) => flatten.node.init_parsed_operations(subgraph_schemas)?,
347            PlanNode::Defer { primary, deferred } => {
348                if let Some(node) = primary.node.as_mut() {
349                    node.init_parsed_operations(subgraph_schemas)?;
350                }
351                for deferred_node in deferred {
352                    if let Some(node) = &mut deferred_node.node {
353                        Arc::make_mut(node).init_parsed_operations(subgraph_schemas)?;
354                    }
355                }
356            }
357            PlanNode::Subscription { primary, rest } => {
358                primary.init_parsed_operation(subgraph_schemas)?;
359                if let Some(node) = rest.as_mut() {
360                    node.init_parsed_operations(subgraph_schemas)?;
361                }
362            }
363            PlanNode::Condition {
364                condition: _,
365                if_clause,
366                else_clause,
367            } => {
368                if let Some(node) = if_clause.as_mut() {
369                    node.init_parsed_operations(subgraph_schemas)?;
370                }
371                if let Some(node) = else_clause.as_mut() {
372                    node.init_parsed_operations(subgraph_schemas)?;
373                }
374            }
375        }
376        Ok(())
377    }
378
379    pub(crate) fn init_parsed_operations_and_hash_subqueries(
380        &mut self,
381        subgraph_schemas: &SubgraphSchemas,
382    ) -> Result<(), ValidationErrors> {
383        match self {
384            PlanNode::Fetch(fetch_node) => {
385                fetch_node.init_parsed_operation_and_hash_subquery(subgraph_schemas)?;
386            }
387
388            PlanNode::Sequence { nodes } => {
389                for node in nodes {
390                    node.init_parsed_operations_and_hash_subqueries(subgraph_schemas)?;
391                }
392            }
393            PlanNode::Parallel { nodes } => {
394                for node in nodes {
395                    node.init_parsed_operations_and_hash_subqueries(subgraph_schemas)?;
396                }
397            }
398            PlanNode::Flatten(flatten) => flatten
399                .node
400                .init_parsed_operations_and_hash_subqueries(subgraph_schemas)?,
401            PlanNode::Defer { primary, deferred } => {
402                if let Some(node) = primary.node.as_mut() {
403                    node.init_parsed_operations_and_hash_subqueries(subgraph_schemas)?;
404                }
405                for deferred_node in deferred {
406                    if let Some(node) = &mut deferred_node.node {
407                        Arc::make_mut(node)
408                            .init_parsed_operations_and_hash_subqueries(subgraph_schemas)?
409                    }
410                }
411            }
412            PlanNode::Subscription { primary: _, rest } => {
413                if let Some(node) = rest.as_mut() {
414                    node.init_parsed_operations_and_hash_subqueries(subgraph_schemas)?;
415                }
416            }
417            PlanNode::Condition {
418                condition: _,
419                if_clause,
420                else_clause,
421            } => {
422                if let Some(node) = if_clause.as_mut() {
423                    node.init_parsed_operations_and_hash_subqueries(subgraph_schemas)?;
424                }
425                if let Some(node) = else_clause.as_mut() {
426                    node.init_parsed_operations_and_hash_subqueries(subgraph_schemas)?;
427                }
428            }
429        }
430        Ok(())
431    }
432
433    #[cfg(test)]
434    /// Retrieves all the services used across all plan nodes.
435    ///
436    /// Note that duplicates are not filtered.
437    pub(crate) fn service_usage<'a>(&'a self) -> Box<dyn Iterator<Item = &'a str> + 'a> {
438        match self {
439            Self::Sequence { nodes } | Self::Parallel { nodes } => {
440                Box::new(nodes.iter().flat_map(|x| x.service_usage()))
441            }
442            Self::Fetch(fetch) => Box::new(Some(fetch.service_name()).into_iter()),
443            Self::Subscription { primary, rest } => match rest {
444                Some(rest) => Box::new(
445                    rest.service_usage()
446                        .chain(Some(primary.service_name.as_ref())),
447                ) as Box<dyn Iterator<Item = &'a str> + 'a>,
448                None => Box::new(Some(primary.service_name.as_ref()).into_iter()),
449            },
450            Self::Flatten(flatten) => flatten.node.service_usage(),
451            Self::Defer { primary, deferred } => primary
452                .node
453                .as_ref()
454                .map(|n| {
455                    Box::new(
456                        n.service_usage().chain(
457                            deferred
458                                .iter()
459                                .flat_map(|d| d.node.iter().flat_map(|node| node.service_usage())),
460                        ),
461                    ) as Box<dyn Iterator<Item = &'a str> + 'a>
462                })
463                .unwrap_or_else(|| {
464                    Box::new(std::iter::empty()) as Box<dyn Iterator<Item = &'a str> + 'a>
465                }),
466
467            Self::Condition {
468                if_clause,
469                else_clause,
470                ..
471            } => match (if_clause, else_clause) {
472                (None, None) => Box::new(None.into_iter()),
473                (None, Some(node)) => node.service_usage(),
474                (Some(node), None) => node.service_usage(),
475                (Some(if_node), Some(else_node)) => {
476                    Box::new(if_node.service_usage().chain(else_node.service_usage()))
477                }
478            },
479        }
480    }
481
482    pub(crate) fn extract_authorization_metadata(
483        &mut self,
484        schema: &Valid<apollo_compiler::Schema>,
485        key: &CacheKeyMetadata,
486    ) {
487        match self {
488            PlanNode::Fetch(fetch_node) => {
489                fetch_node.extract_authorization_metadata(schema, key);
490            }
491
492            PlanNode::Sequence { nodes } => {
493                for node in nodes {
494                    node.extract_authorization_metadata(schema, key);
495                }
496            }
497            PlanNode::Parallel { nodes } => {
498                for node in nodes {
499                    node.extract_authorization_metadata(schema, key);
500                }
501            }
502            PlanNode::Flatten(flatten) => flatten.node.extract_authorization_metadata(schema, key),
503            PlanNode::Defer { primary, deferred } => {
504                if let Some(node) = primary.node.as_mut() {
505                    node.extract_authorization_metadata(schema, key);
506                }
507                for deferred_node in deferred {
508                    if let Some(node) = deferred_node.node.take() {
509                        let mut new_node = (*node).clone();
510                        new_node.extract_authorization_metadata(schema, key);
511                        deferred_node.node = Some(Arc::new(new_node));
512                    }
513                }
514            }
515            PlanNode::Subscription { primary: _, rest } => {
516                if let Some(node) = rest.as_mut() {
517                    node.extract_authorization_metadata(schema, key);
518                }
519            }
520            PlanNode::Condition {
521                condition: _,
522                if_clause,
523                else_clause,
524            } => {
525                if let Some(node) = if_clause.as_mut() {
526                    node.extract_authorization_metadata(schema, key);
527                }
528                if let Some(node) = else_clause.as_mut() {
529                    node.extract_authorization_metadata(schema, key);
530                }
531            }
532        }
533    }
534}
535
536/// A flatten node.
537#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
538#[serde(rename_all = "camelCase")]
539pub(crate) struct FlattenNode {
540    /// The path when result should be merged.
541    pub(crate) path: Path,
542
543    /// The child execution plan.
544    pub(crate) node: Box<PlanNode>,
545}
546
547/// A primary query for a Defer node, the non deferred part
548#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
549#[serde(rename_all = "camelCase")]
550pub(crate) struct Primary {
551    /// The part of the original query that "selects" the data to
552    /// send in that primary response (once the plan in `node` completes).
553    pub(crate) subselection: Option<String>,
554
555    // The plan to get all the data for that primary part
556    pub(crate) node: Option<Box<PlanNode>>,
557}
558
559/// The "deferred" parts of the defer (note that it's an array). Each
560/// of those deferred elements will correspond to a different chunk of
561/// the response to the client (after the initial non-deferred one that is).
562#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
563#[serde(rename_all = "camelCase")]
564pub(crate) struct DeferredNode {
565    /// References one or more fetch node(s) (by `id`) within
566    /// `primary.node`. The plan of this deferred part should not
567    /// be started before all those fetches returns.
568    pub(crate) depends: Vec<Depends>,
569
570    /// The optional defer label.
571    pub(crate) label: Option<String>,
572    /// Path to the @defer this correspond to. `subselection` start at that `path`.
573    pub(crate) query_path: Path,
574    /// The part of the original query that "selects" the data to send
575    /// in that deferred response (once the plan in `node` completes).
576    /// Will be set _unless_ `node` is a `DeferNode` itself.
577    pub(crate) subselection: Option<String>,
578    /// The plan to get all the data for that deferred part
579    pub(crate) node: Option<Arc<PlanNode>>,
580}
581
582/// A deferred node.
583#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
584#[serde(rename_all = "camelCase")]
585pub(crate) struct Depends {
586    pub(crate) id: String,
587}
588
589#[cfg(test)]
590mod test {
591    use crate::query_planner::QueryPlan;
592
593    #[test]
594    fn test_estimated_size() {
595        let query_plan = QueryPlan::fake_builder().build();
596        let size1 = query_plan.estimated_size();
597        let size2 = query_plan.estimated_size();
598        assert!(size1 > 0);
599        assert_eq!(size1, size2);
600    }
601}