1use std::sync::Arc;
2use std::sync::atomic::AtomicUsize;
3use std::sync::atomic::Ordering;
4
5use apollo_compiler::validation::Valid;
6use serde::Deserialize;
7use serde::Serialize;
8
9pub(crate) use self::fetch::OperationKind;
10use super::fetch;
11use super::subscription::SubscriptionNode;
12use crate::apollo_studio_interop::UsageReporting;
13use crate::cache::estimate_size;
14use crate::configuration::Batching;
15use crate::error::CacheResolverError;
16use crate::error::ValidationErrors;
17use crate::json_ext::Object;
18use crate::json_ext::Path;
19use crate::json_ext::Value;
20use crate::plugins::authorization::CacheKeyMetadata;
21use crate::query_planner::fetch::SubgraphSchemas;
22use crate::services::query_planner::PlanOptions;
23use crate::spec::Query;
24use crate::spec::QueryHash;
25use crate::spec::operation_limits::OperationLimits;
26
27#[derive(Clone)]
31pub(crate) struct QueryKey {
32 pub(crate) filtered_query: String,
33 pub(crate) original_query: String,
34 pub(crate) operation_name: Option<String>,
35 pub(crate) metadata: CacheKeyMetadata,
36 pub(crate) plan_options: PlanOptions,
37}
38
39#[derive(Clone, Debug, Serialize, Deserialize)]
41pub struct QueryPlan {
42 pub(crate) usage_reporting: Arc<UsageReporting>,
43 pub(crate) root: Arc<PlanNode>,
44 pub(crate) formatted_query_plan: Option<Arc<String>>,
46 pub(crate) query: Arc<Query>,
47 pub(crate) query_metrics: OperationLimits<u32>,
48
49 #[serde(default)]
51 pub(crate) estimated_size: Arc<AtomicUsize>,
52}
53
54#[buildstructor::buildstructor]
57impl QueryPlan {
58 #[builder]
59 pub(crate) fn fake_new(
60 root: Option<PlanNode>,
61 usage_reporting: Option<UsageReporting>,
62 ) -> Self {
63 Self {
64 usage_reporting: usage_reporting
65 .unwrap_or_else(|| UsageReporting {
66 stats_report_key: "this is a test report key".to_string(),
67 referenced_fields_by_type: Default::default(),
68 })
69 .into(),
70 root: Arc::new(root.unwrap_or_else(|| PlanNode::Sequence { nodes: Vec::new() })),
71 formatted_query_plan: Default::default(),
72 query: Arc::new(Query::empty_for_tests()),
73 query_metrics: Default::default(),
74 estimated_size: Default::default(),
75 }
76 }
77}
78
79impl QueryPlan {
80 pub(crate) fn is_deferred(&self, variables: &Object) -> bool {
81 self.root.is_deferred(variables, &self.query)
82 }
83
84 pub(crate) fn is_subscription(&self) -> bool {
85 matches!(self.query.operation.kind(), OperationKind::Subscription)
86 }
87
88 pub(crate) fn query_hashes(
89 &self,
90 batching_config: Batching,
91 variables: &Object,
92 ) -> Result<Vec<Arc<QueryHash>>, CacheResolverError> {
93 self.root
94 .query_hashes(batching_config, variables, &self.query)
95 }
96
97 pub(crate) fn estimated_size(&self) -> usize {
98 if self.estimated_size.load(Ordering::SeqCst) == 0 {
99 self.estimated_size
100 .store(estimate_size(self), Ordering::SeqCst);
101 }
102 self.estimated_size.load(Ordering::SeqCst)
103 }
104}
105
106#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
108#[serde(rename_all = "PascalCase", tag = "kind")]
109pub(crate) enum PlanNode {
110 Sequence {
112 nodes: Vec<PlanNode>,
114 },
115
116 Parallel {
118 nodes: Vec<PlanNode>,
120 },
121
122 Fetch(fetch::FetchNode),
124
125 Flatten(FlattenNode),
127
128 Defer {
129 primary: Primary,
130 deferred: Vec<DeferredNode>,
131 },
132
133 Subscription {
134 primary: SubscriptionNode,
135 rest: Option<Box<PlanNode>>,
136 },
137
138 #[serde(rename_all = "camelCase")]
139 Condition {
140 condition: String,
141 if_clause: Option<Box<PlanNode>>,
142 else_clause: Option<Box<PlanNode>>,
143 },
144}
145
146impl PlanNode {
147 pub(crate) fn contains_mutations(&self) -> bool {
148 match self {
149 Self::Sequence { nodes } => nodes.iter().any(|n| n.contains_mutations()),
150 Self::Parallel { nodes } => nodes.iter().any(|n| n.contains_mutations()),
151 Self::Fetch(fetch_node) => fetch_node.operation_kind() == &OperationKind::Mutation,
152 Self::Defer { primary, .. } => primary
153 .node
154 .as_ref()
155 .map(|n| n.contains_mutations())
156 .unwrap_or(false),
157 Self::Subscription { .. } => false,
158 Self::Flatten(_) => false,
159 Self::Condition {
160 if_clause,
161 else_clause,
162 ..
163 } => {
164 if let Some(node) = if_clause {
165 if node.contains_mutations() {
166 return true;
167 }
168 }
169 if let Some(node) = else_clause {
170 if node.contains_mutations() {
171 return true;
172 }
173 }
174 false
175 }
176 }
177 }
178
179 pub(crate) fn is_deferred(&self, variables: &Object, query: &Query) -> bool {
180 match self {
181 Self::Sequence { nodes } => nodes.iter().any(|n| n.is_deferred(variables, query)),
182 Self::Parallel { nodes } => nodes.iter().any(|n| n.is_deferred(variables, query)),
183 Self::Flatten(node) => node.node.is_deferred(variables, query),
184 Self::Fetch(..) => false,
185 Self::Defer { .. } => true,
186 Self::Subscription { .. } => false,
187 Self::Condition {
188 if_clause,
189 else_clause,
190 condition,
191 } => {
192 if query
193 .variable_value(condition.as_str(), variables)
194 .map(|v| *v == Value::Bool(true))
195 .unwrap_or(true)
196 {
197 if let Some(node) = if_clause {
200 if node.is_deferred(variables, query) {
201 return true;
202 }
203 }
204 } else if let Some(node) = else_clause {
205 if node.is_deferred(variables, query) {
206 return true;
207 }
208 }
209
210 false
211 }
212 }
213 }
214
215 pub(crate) fn query_hashes(
228 &self,
229 batching_config: Batching,
230 variables: &Object,
231 query: &Query,
232 ) -> Result<Vec<Arc<QueryHash>>, CacheResolverError> {
233 let mut query_hashes = vec![];
234 let mut new_targets = vec![self];
235
236 loop {
237 let targets = new_targets;
238 if targets.is_empty() {
239 break;
240 }
241
242 new_targets = vec![];
243 for target in targets {
244 match target {
245 PlanNode::Sequence { nodes } | PlanNode::Parallel { nodes } => {
246 new_targets.extend(nodes);
247 }
248 PlanNode::Fetch(node) => {
249 if node.requires.is_empty()
251 && batching_config.batch_include(&node.service_name)
252 {
253 query_hashes.push(node.schema_aware_hash.clone());
254 }
255 }
256 PlanNode::Flatten(node) => new_targets.push(&node.node),
257 PlanNode::Defer { .. } => {
258 return Err(CacheResolverError::BatchingError(
259 "unexpected defer node encountered during query_hash processing"
260 .to_string(),
261 ));
262 }
263 PlanNode::Subscription { .. } => {
264 return Err(CacheResolverError::BatchingError(
265 "unexpected subscription node encountered during query_hash processing"
266 .to_string(),
267 ));
268 }
269 PlanNode::Condition {
270 if_clause,
271 else_clause,
272 condition,
273 } => {
274 if query
275 .variable_value(condition.as_str(), variables)
276 .map(|v| *v == Value::Bool(true))
277 .unwrap_or(true)
278 {
279 if let Some(node) = if_clause {
280 new_targets.push(node);
281 }
282 } else if let Some(node) = else_clause {
283 new_targets.push(node);
284 }
285 }
286 }
287 }
288 }
289 Ok(query_hashes)
290 }
291
292 pub(crate) fn subgraph_fetches(&self) -> usize {
293 match self {
294 PlanNode::Sequence { nodes } => nodes.iter().map(|n| n.subgraph_fetches()).sum(),
295 PlanNode::Parallel { nodes } => nodes.iter().map(|n| n.subgraph_fetches()).sum(),
296 PlanNode::Fetch(_) => 1,
297 PlanNode::Flatten(node) => node.node.subgraph_fetches(),
298 PlanNode::Defer { primary, deferred } => {
299 primary.node.as_ref().map_or(0, |n| n.subgraph_fetches())
300 + deferred
301 .iter()
302 .map(|n| n.node.as_ref().map_or(0, |n| n.subgraph_fetches()))
303 .sum::<usize>()
304 }
305 PlanNode::Subscription { rest, .. } => {
307 rest.as_ref().map_or(0, |n| n.subgraph_fetches()) + 1
308 }
309 PlanNode::Condition {
311 if_clause,
312 else_clause,
313 ..
314 } => std::cmp::max(
315 if_clause
316 .as_ref()
317 .map(|n| n.subgraph_fetches())
318 .unwrap_or(0),
319 else_clause
320 .as_ref()
321 .map(|n| n.subgraph_fetches())
322 .unwrap_or(0),
323 ),
324 }
325 }
326
327 pub(crate) fn init_parsed_operations(
328 &mut self,
329 subgraph_schemas: &SubgraphSchemas,
330 ) -> Result<(), ValidationErrors> {
331 match self {
332 PlanNode::Fetch(fetch_node) => {
333 fetch_node.init_parsed_operation(subgraph_schemas)?;
334 }
335
336 PlanNode::Sequence { nodes } => {
337 for node in nodes {
338 node.init_parsed_operations(subgraph_schemas)?;
339 }
340 }
341 PlanNode::Parallel { nodes } => {
342 for node in nodes {
343 node.init_parsed_operations(subgraph_schemas)?;
344 }
345 }
346 PlanNode::Flatten(flatten) => flatten.node.init_parsed_operations(subgraph_schemas)?,
347 PlanNode::Defer { primary, deferred } => {
348 if let Some(node) = primary.node.as_mut() {
349 node.init_parsed_operations(subgraph_schemas)?;
350 }
351 for deferred_node in deferred {
352 if let Some(node) = &mut deferred_node.node {
353 Arc::make_mut(node).init_parsed_operations(subgraph_schemas)?;
354 }
355 }
356 }
357 PlanNode::Subscription { primary, rest } => {
358 primary.init_parsed_operation(subgraph_schemas)?;
359 if let Some(node) = rest.as_mut() {
360 node.init_parsed_operations(subgraph_schemas)?;
361 }
362 }
363 PlanNode::Condition {
364 condition: _,
365 if_clause,
366 else_clause,
367 } => {
368 if let Some(node) = if_clause.as_mut() {
369 node.init_parsed_operations(subgraph_schemas)?;
370 }
371 if let Some(node) = else_clause.as_mut() {
372 node.init_parsed_operations(subgraph_schemas)?;
373 }
374 }
375 }
376 Ok(())
377 }
378
379 pub(crate) fn init_parsed_operations_and_hash_subqueries(
380 &mut self,
381 subgraph_schemas: &SubgraphSchemas,
382 ) -> Result<(), ValidationErrors> {
383 match self {
384 PlanNode::Fetch(fetch_node) => {
385 fetch_node.init_parsed_operation_and_hash_subquery(subgraph_schemas)?;
386 }
387
388 PlanNode::Sequence { nodes } => {
389 for node in nodes {
390 node.init_parsed_operations_and_hash_subqueries(subgraph_schemas)?;
391 }
392 }
393 PlanNode::Parallel { nodes } => {
394 for node in nodes {
395 node.init_parsed_operations_and_hash_subqueries(subgraph_schemas)?;
396 }
397 }
398 PlanNode::Flatten(flatten) => flatten
399 .node
400 .init_parsed_operations_and_hash_subqueries(subgraph_schemas)?,
401 PlanNode::Defer { primary, deferred } => {
402 if let Some(node) = primary.node.as_mut() {
403 node.init_parsed_operations_and_hash_subqueries(subgraph_schemas)?;
404 }
405 for deferred_node in deferred {
406 if let Some(node) = &mut deferred_node.node {
407 Arc::make_mut(node)
408 .init_parsed_operations_and_hash_subqueries(subgraph_schemas)?
409 }
410 }
411 }
412 PlanNode::Subscription { primary: _, rest } => {
413 if let Some(node) = rest.as_mut() {
414 node.init_parsed_operations_and_hash_subqueries(subgraph_schemas)?;
415 }
416 }
417 PlanNode::Condition {
418 condition: _,
419 if_clause,
420 else_clause,
421 } => {
422 if let Some(node) = if_clause.as_mut() {
423 node.init_parsed_operations_and_hash_subqueries(subgraph_schemas)?;
424 }
425 if let Some(node) = else_clause.as_mut() {
426 node.init_parsed_operations_and_hash_subqueries(subgraph_schemas)?;
427 }
428 }
429 }
430 Ok(())
431 }
432
433 #[cfg(test)]
434 pub(crate) fn service_usage<'a>(&'a self) -> Box<dyn Iterator<Item = &'a str> + 'a> {
438 match self {
439 Self::Sequence { nodes } | Self::Parallel { nodes } => {
440 Box::new(nodes.iter().flat_map(|x| x.service_usage()))
441 }
442 Self::Fetch(fetch) => Box::new(Some(fetch.service_name()).into_iter()),
443 Self::Subscription { primary, rest } => match rest {
444 Some(rest) => Box::new(
445 rest.service_usage()
446 .chain(Some(primary.service_name.as_ref())),
447 ) as Box<dyn Iterator<Item = &'a str> + 'a>,
448 None => Box::new(Some(primary.service_name.as_ref()).into_iter()),
449 },
450 Self::Flatten(flatten) => flatten.node.service_usage(),
451 Self::Defer { primary, deferred } => primary
452 .node
453 .as_ref()
454 .map(|n| {
455 Box::new(
456 n.service_usage().chain(
457 deferred
458 .iter()
459 .flat_map(|d| d.node.iter().flat_map(|node| node.service_usage())),
460 ),
461 ) as Box<dyn Iterator<Item = &'a str> + 'a>
462 })
463 .unwrap_or_else(|| {
464 Box::new(std::iter::empty()) as Box<dyn Iterator<Item = &'a str> + 'a>
465 }),
466
467 Self::Condition {
468 if_clause,
469 else_clause,
470 ..
471 } => match (if_clause, else_clause) {
472 (None, None) => Box::new(None.into_iter()),
473 (None, Some(node)) => node.service_usage(),
474 (Some(node), None) => node.service_usage(),
475 (Some(if_node), Some(else_node)) => {
476 Box::new(if_node.service_usage().chain(else_node.service_usage()))
477 }
478 },
479 }
480 }
481
482 pub(crate) fn extract_authorization_metadata(
483 &mut self,
484 schema: &Valid<apollo_compiler::Schema>,
485 key: &CacheKeyMetadata,
486 ) {
487 match self {
488 PlanNode::Fetch(fetch_node) => {
489 fetch_node.extract_authorization_metadata(schema, key);
490 }
491
492 PlanNode::Sequence { nodes } => {
493 for node in nodes {
494 node.extract_authorization_metadata(schema, key);
495 }
496 }
497 PlanNode::Parallel { nodes } => {
498 for node in nodes {
499 node.extract_authorization_metadata(schema, key);
500 }
501 }
502 PlanNode::Flatten(flatten) => flatten.node.extract_authorization_metadata(schema, key),
503 PlanNode::Defer { primary, deferred } => {
504 if let Some(node) = primary.node.as_mut() {
505 node.extract_authorization_metadata(schema, key);
506 }
507 for deferred_node in deferred {
508 if let Some(node) = deferred_node.node.take() {
509 let mut new_node = (*node).clone();
510 new_node.extract_authorization_metadata(schema, key);
511 deferred_node.node = Some(Arc::new(new_node));
512 }
513 }
514 }
515 PlanNode::Subscription { primary: _, rest } => {
516 if let Some(node) = rest.as_mut() {
517 node.extract_authorization_metadata(schema, key);
518 }
519 }
520 PlanNode::Condition {
521 condition: _,
522 if_clause,
523 else_clause,
524 } => {
525 if let Some(node) = if_clause.as_mut() {
526 node.extract_authorization_metadata(schema, key);
527 }
528 if let Some(node) = else_clause.as_mut() {
529 node.extract_authorization_metadata(schema, key);
530 }
531 }
532 }
533 }
534}
535
536#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
538#[serde(rename_all = "camelCase")]
539pub(crate) struct FlattenNode {
540 pub(crate) path: Path,
542
543 pub(crate) node: Box<PlanNode>,
545}
546
547#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
549#[serde(rename_all = "camelCase")]
550pub(crate) struct Primary {
551 pub(crate) subselection: Option<String>,
554
555 pub(crate) node: Option<Box<PlanNode>>,
557}
558
559#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
563#[serde(rename_all = "camelCase")]
564pub(crate) struct DeferredNode {
565 pub(crate) depends: Vec<Depends>,
569
570 pub(crate) label: Option<String>,
572 pub(crate) query_path: Path,
574 pub(crate) subselection: Option<String>,
578 pub(crate) node: Option<Arc<PlanNode>>,
580}
581
582#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
584#[serde(rename_all = "camelCase")]
585pub(crate) struct Depends {
586 pub(crate) id: String,
587}
588
589#[cfg(test)]
590mod test {
591 use crate::query_planner::QueryPlan;
592
593 #[test]
594 fn test_estimated_size() {
595 let query_plan = QueryPlan::fake_builder().build();
596 let size1 = query_plan.estimated_size();
597 let size2 = query_plan.estimated_size();
598 assert!(size1 > 0);
599 assert_eq!(size1, size2);
600 }
601}