1use std::sync::Arc;
2use std::sync::atomic::AtomicUsize;
3use std::sync::atomic::Ordering;
4
5use apollo_compiler::collections::HashSet;
6use apollo_compiler::validation::Valid;
7use serde::Deserialize;
8use serde::Serialize;
9
10pub(crate) use self::fetch::OperationKind;
11use super::fetch;
12use super::subscription::SubscriptionNode;
13use crate::apollo_studio_interop::UsageReporting;
14use crate::cache::estimate_size;
15use crate::configuration::Batching;
16use crate::error::CacheResolverError;
17use crate::error::ValidationErrors;
18use crate::json_ext::Object;
19use crate::json_ext::Path;
20use crate::json_ext::Value;
21use crate::plugins::authorization::CacheKeyMetadata;
22use crate::query_planner::fetch::SubgraphSchemas;
23use crate::services::query_planner::PlanOptions;
24use crate::spec::Query;
25use crate::spec::QueryHash;
26use crate::spec::operation_limits::OperationLimits;
27
28#[derive(Clone)]
32pub(crate) struct QueryKey {
33 pub(crate) filtered_query: String,
34 pub(crate) original_query: String,
35 pub(crate) operation_name: Option<String>,
36 pub(crate) metadata: CacheKeyMetadata,
37 pub(crate) plan_options: PlanOptions,
38}
39
40#[derive(Clone, Debug, Serialize, Deserialize)]
42pub struct QueryPlan {
43 pub(crate) usage_reporting: Arc<UsageReporting>,
44 pub(crate) root: Arc<PlanNode>,
45 pub(crate) formatted_query_plan: Option<Arc<String>>,
47 pub(crate) query: Arc<Query>,
48 pub(crate) query_metrics: OperationLimits<u32>,
49
50 #[serde(default)]
52 pub(crate) estimated_size: Arc<AtomicUsize>,
53}
54
55#[buildstructor::buildstructor]
58impl QueryPlan {
59 #[builder]
60 pub(crate) fn fake_new(
61 root: Option<PlanNode>,
62 usage_reporting: Option<UsageReporting>,
63 ) -> Self {
64 Self {
65 usage_reporting: usage_reporting
66 .unwrap_or_else(|| UsageReporting::Error("this is a test report key".to_string()))
67 .into(),
68 root: Arc::new(root.unwrap_or_else(|| PlanNode::Sequence { nodes: Vec::new() })),
69 formatted_query_plan: Default::default(),
70 query: Arc::new(Query::empty_for_tests()),
71 query_metrics: Default::default(),
72 estimated_size: Default::default(),
73 }
74 }
75}
76
77impl QueryPlan {
78 pub(crate) fn is_deferred(&self, variables: &Object) -> bool {
79 self.root.is_deferred(variables, &self.query)
80 }
81
82 pub(crate) fn is_subscription(&self) -> bool {
83 matches!(self.query.operation.kind(), OperationKind::Subscription)
84 }
85
86 pub(crate) fn query_hashes(
87 &self,
88 batching_config: Batching,
89 variables: &Object,
90 ) -> Result<Vec<Arc<QueryHash>>, CacheResolverError> {
91 self.root
92 .query_hashes(batching_config, variables, &self.query)
93 }
94
95 pub(crate) fn estimated_size(&self) -> usize {
96 if self.estimated_size.load(Ordering::SeqCst) == 0 {
97 self.estimated_size
98 .store(estimate_size(self), Ordering::SeqCst);
99 }
100 self.estimated_size.load(Ordering::SeqCst)
101 }
102}
103
104#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
106#[serde(rename_all = "PascalCase", tag = "kind")]
107pub(crate) enum PlanNode {
108 Sequence {
110 nodes: Vec<PlanNode>,
112 },
113
114 Parallel {
116 nodes: Vec<PlanNode>,
118 },
119
120 Fetch(fetch::FetchNode),
122
123 Flatten(FlattenNode),
125
126 Defer {
127 primary: Primary,
128 deferred: Vec<DeferredNode>,
129 },
130
131 Subscription {
132 primary: SubscriptionNode,
133 rest: Option<Box<PlanNode>>,
134 },
135
136 #[serde(rename_all = "camelCase")]
137 Condition {
138 condition: String,
139 if_clause: Option<Box<PlanNode>>,
140 else_clause: Option<Box<PlanNode>>,
141 },
142}
143
144impl PlanNode {
145 pub(crate) fn contains_mutations(&self) -> bool {
146 match self {
147 Self::Sequence { nodes } => nodes.iter().any(|n| n.contains_mutations()),
148 Self::Parallel { nodes } => nodes.iter().any(|n| n.contains_mutations()),
149 Self::Fetch(fetch_node) => fetch_node.operation_kind() == &OperationKind::Mutation,
150 Self::Defer { primary, .. } => primary
151 .node
152 .as_ref()
153 .map(|n| n.contains_mutations())
154 .unwrap_or(false),
155 Self::Subscription { .. } => false,
156 Self::Flatten(_) => false,
157 Self::Condition {
158 if_clause,
159 else_clause,
160 ..
161 } => {
162 if let Some(node) = if_clause
163 && node.contains_mutations()
164 {
165 return true;
166 }
167 if let Some(node) = else_clause
168 && node.contains_mutations()
169 {
170 return true;
171 }
172 false
173 }
174 }
175 }
176
177 pub(crate) fn is_deferred(&self, variables: &Object, query: &Query) -> bool {
178 match self {
179 Self::Sequence { nodes } => nodes.iter().any(|n| n.is_deferred(variables, query)),
180 Self::Parallel { nodes } => nodes.iter().any(|n| n.is_deferred(variables, query)),
181 Self::Flatten(node) => node.node.is_deferred(variables, query),
182 Self::Fetch(..) => false,
183 Self::Defer { .. } => true,
184 Self::Subscription { .. } => false,
185 Self::Condition {
186 if_clause,
187 else_clause,
188 condition,
189 } => {
190 if query
191 .variable_value(condition.as_str(), variables)
192 .map(|v| *v == Value::Bool(true))
193 .unwrap_or(true)
194 {
195 if let Some(node) = if_clause
198 && node.is_deferred(variables, query)
199 {
200 return true;
201 }
202 } else if let Some(node) = else_clause
203 && node.is_deferred(variables, query)
204 {
205 return true;
206 }
207
208 false
209 }
210 }
211 }
212
213 pub(crate) fn query_hashes(
226 &self,
227 batching_config: Batching,
228 variables: &Object,
229 query: &Query,
230 ) -> Result<Vec<Arc<QueryHash>>, CacheResolverError> {
231 let mut query_hashes = vec![];
232 let mut new_targets = vec![self];
233
234 loop {
235 let targets = new_targets;
236 if targets.is_empty() {
237 break;
238 }
239
240 new_targets = vec![];
241 for target in targets {
242 match target {
243 PlanNode::Sequence { nodes } | PlanNode::Parallel { nodes } => {
244 new_targets.extend(nodes);
245 }
246 PlanNode::Fetch(node) => {
247 if node.requires.is_empty()
249 && batching_config.batch_include(&node.service_name)
250 {
251 query_hashes.push(node.schema_aware_hash.clone());
252 }
253 }
254 PlanNode::Flatten(node) => new_targets.push(&node.node),
255 PlanNode::Defer { .. } => {
256 return Err(CacheResolverError::BatchingError(
257 "unexpected defer node encountered during query_hash processing"
258 .to_string(),
259 ));
260 }
261 PlanNode::Subscription { .. } => {
262 return Err(CacheResolverError::BatchingError(
263 "unexpected subscription node encountered during query_hash processing"
264 .to_string(),
265 ));
266 }
267 PlanNode::Condition {
268 if_clause,
269 else_clause,
270 condition,
271 } => {
272 if query
273 .variable_value(condition.as_str(), variables)
274 .map(|v| *v == Value::Bool(true))
275 .unwrap_or(true)
276 {
277 if let Some(node) = if_clause {
278 new_targets.push(node);
279 }
280 } else if let Some(node) = else_clause {
281 new_targets.push(node);
282 }
283 }
284 }
285 }
286 }
287 Ok(query_hashes)
288 }
289
290 pub(crate) fn subgraph_fetches(&self) -> usize {
291 match self {
292 PlanNode::Sequence { nodes } => nodes.iter().map(|n| n.subgraph_fetches()).sum(),
293 PlanNode::Parallel { nodes } => nodes.iter().map(|n| n.subgraph_fetches()).sum(),
294 PlanNode::Fetch(_) => 1,
295 PlanNode::Flatten(node) => node.node.subgraph_fetches(),
296 PlanNode::Defer { primary, deferred } => {
297 primary.node.as_ref().map_or(0, |n| n.subgraph_fetches())
298 + deferred
299 .iter()
300 .map(|n| n.node.as_ref().map_or(0, |n| n.subgraph_fetches()))
301 .sum::<usize>()
302 }
303 PlanNode::Subscription { rest, .. } => {
305 rest.as_ref().map_or(0, |n| n.subgraph_fetches()) + 1
306 }
307 PlanNode::Condition {
309 if_clause,
310 else_clause,
311 ..
312 } => std::cmp::max(
313 if_clause
314 .as_ref()
315 .map(|n| n.subgraph_fetches())
316 .unwrap_or(0),
317 else_clause
318 .as_ref()
319 .map(|n| n.subgraph_fetches())
320 .unwrap_or(0),
321 ),
322 }
323 }
324
325 pub(crate) fn init_parsed_operations(
326 &mut self,
327 subgraph_schemas: &SubgraphSchemas,
328 ) -> Result<(), ValidationErrors> {
329 match self {
330 PlanNode::Fetch(fetch_node) => {
331 fetch_node.init_parsed_operation(subgraph_schemas)?;
332 }
333
334 PlanNode::Sequence { nodes } => {
335 for node in nodes {
336 node.init_parsed_operations(subgraph_schemas)?;
337 }
338 }
339 PlanNode::Parallel { nodes } => {
340 for node in nodes {
341 node.init_parsed_operations(subgraph_schemas)?;
342 }
343 }
344 PlanNode::Flatten(flatten) => flatten.node.init_parsed_operations(subgraph_schemas)?,
345 PlanNode::Defer { primary, deferred } => {
346 if let Some(node) = primary.node.as_mut() {
347 node.init_parsed_operations(subgraph_schemas)?;
348 }
349 for deferred_node in deferred {
350 if let Some(node) = &mut deferred_node.node {
351 Arc::make_mut(node).init_parsed_operations(subgraph_schemas)?;
352 }
353 }
354 }
355 PlanNode::Subscription { primary, rest } => {
356 primary.init_parsed_operation(subgraph_schemas)?;
357 if let Some(node) = rest.as_mut() {
358 node.init_parsed_operations(subgraph_schemas)?;
359 }
360 }
361 PlanNode::Condition {
362 condition: _,
363 if_clause,
364 else_clause,
365 } => {
366 if let Some(node) = if_clause.as_mut() {
367 node.init_parsed_operations(subgraph_schemas)?;
368 }
369 if let Some(node) = else_clause.as_mut() {
370 node.init_parsed_operations(subgraph_schemas)?;
371 }
372 }
373 }
374 Ok(())
375 }
376
377 pub(crate) fn init_parsed_operations_and_hash_subqueries(
378 &mut self,
379 subgraph_schemas: &SubgraphSchemas,
380 ) -> Result<(), ValidationErrors> {
381 match self {
382 PlanNode::Fetch(fetch_node) => {
383 fetch_node.init_parsed_operation_and_hash_subquery(subgraph_schemas)?;
384 }
385
386 PlanNode::Sequence { nodes } => {
387 for node in nodes {
388 node.init_parsed_operations_and_hash_subqueries(subgraph_schemas)?;
389 }
390 }
391 PlanNode::Parallel { nodes } => {
392 for node in nodes {
393 node.init_parsed_operations_and_hash_subqueries(subgraph_schemas)?;
394 }
395 }
396 PlanNode::Flatten(flatten) => flatten
397 .node
398 .init_parsed_operations_and_hash_subqueries(subgraph_schemas)?,
399 PlanNode::Defer { primary, deferred } => {
400 if let Some(node) = primary.node.as_mut() {
401 node.init_parsed_operations_and_hash_subqueries(subgraph_schemas)?;
402 }
403 for deferred_node in deferred {
404 if let Some(node) = &mut deferred_node.node {
405 Arc::make_mut(node)
406 .init_parsed_operations_and_hash_subqueries(subgraph_schemas)?
407 }
408 }
409 }
410 PlanNode::Subscription { primary: _, rest } => {
411 if let Some(node) = rest.as_mut() {
412 node.init_parsed_operations_and_hash_subqueries(subgraph_schemas)?;
413 }
414 }
415 PlanNode::Condition {
416 condition: _,
417 if_clause,
418 else_clause,
419 } => {
420 if let Some(node) = if_clause.as_mut() {
421 node.init_parsed_operations_and_hash_subqueries(subgraph_schemas)?;
422 }
423 if let Some(node) = else_clause.as_mut() {
424 node.init_parsed_operations_and_hash_subqueries(subgraph_schemas)?;
425 }
426 }
427 }
428 Ok(())
429 }
430
431 #[cfg(test)]
432 pub(crate) fn service_usage<'a>(&'a self) -> Box<dyn Iterator<Item = &'a str> + 'a> {
436 match self {
437 Self::Sequence { nodes } | Self::Parallel { nodes } => {
438 Box::new(nodes.iter().flat_map(|x| x.service_usage()))
439 }
440 Self::Fetch(fetch) => Box::new(Some(fetch.service_name()).into_iter()),
441 Self::Subscription { primary, rest } => match rest {
442 Some(rest) => Box::new(
443 rest.service_usage()
444 .chain(Some(primary.service_name.as_ref())),
445 ) as Box<dyn Iterator<Item = &'a str> + 'a>,
446 None => Box::new(Some(primary.service_name.as_ref()).into_iter()),
447 },
448 Self::Flatten(flatten) => flatten.node.service_usage(),
449 Self::Defer { primary, deferred } => primary
450 .node
451 .as_ref()
452 .map(|n| {
453 Box::new(
454 n.service_usage().chain(
455 deferred
456 .iter()
457 .flat_map(|d| d.node.iter().flat_map(|node| node.service_usage())),
458 ),
459 ) as Box<dyn Iterator<Item = &'a str> + 'a>
460 })
461 .unwrap_or_else(|| {
462 Box::new(std::iter::empty()) as Box<dyn Iterator<Item = &'a str> + 'a>
463 }),
464
465 Self::Condition {
466 if_clause,
467 else_clause,
468 ..
469 } => match (if_clause, else_clause) {
470 (None, None) => Box::new(None.into_iter()),
471 (None, Some(node)) => node.service_usage(),
472 (Some(node), None) => node.service_usage(),
473 (Some(if_node), Some(else_node)) => {
474 Box::new(if_node.service_usage().chain(else_node.service_usage()))
475 }
476 },
477 }
478 }
479
480 pub(crate) fn service_usage_set(&self) -> HashSet<&str> {
483 let mut services = HashSet::default();
484 let mut stack = vec![self];
485 while let Some(node) = stack.pop() {
486 match node {
487 Self::Sequence { nodes } | Self::Parallel { nodes } => {
488 stack.extend(nodes.iter());
489 }
490 Self::Fetch(fetch) => {
491 services.insert(fetch.service_name.as_ref());
492 }
493 Self::Subscription { primary, rest } => {
494 services.insert(primary.service_name.as_ref());
495 if let Some(rest) = rest {
496 stack.push(rest);
497 }
498 }
499 Self::Flatten(flatten) => {
500 stack.push(&flatten.node);
501 }
502 Self::Defer { primary, deferred } => {
503 if let Some(primary) = primary.node.as_ref() {
504 stack.push(primary);
505 }
506 stack.extend(deferred.iter().flat_map(|d| d.node.as_deref()));
507 }
508 Self::Condition {
509 if_clause,
510 else_clause,
511 ..
512 } => {
513 if let Some(if_clause) = if_clause {
514 stack.push(if_clause);
515 }
516 if let Some(else_clause) = else_clause {
517 stack.push(else_clause);
518 }
519 }
520 }
521 }
522 services
523 }
524
525 pub(crate) fn extract_authorization_metadata(
526 &mut self,
527 schema: &Valid<apollo_compiler::Schema>,
528 key: &CacheKeyMetadata,
529 ) {
530 match self {
531 PlanNode::Fetch(fetch_node) => {
532 fetch_node.extract_authorization_metadata(schema, key);
533 }
534
535 PlanNode::Sequence { nodes } => {
536 for node in nodes {
537 node.extract_authorization_metadata(schema, key);
538 }
539 }
540 PlanNode::Parallel { nodes } => {
541 for node in nodes {
542 node.extract_authorization_metadata(schema, key);
543 }
544 }
545 PlanNode::Flatten(flatten) => flatten.node.extract_authorization_metadata(schema, key),
546 PlanNode::Defer { primary, deferred } => {
547 if let Some(node) = primary.node.as_mut() {
548 node.extract_authorization_metadata(schema, key);
549 }
550 for deferred_node in deferred {
551 if let Some(node) = deferred_node.node.take() {
552 let mut new_node = (*node).clone();
553 new_node.extract_authorization_metadata(schema, key);
554 deferred_node.node = Some(Arc::new(new_node));
555 }
556 }
557 }
558 PlanNode::Subscription { primary: _, rest } => {
559 if let Some(node) = rest.as_mut() {
560 node.extract_authorization_metadata(schema, key);
561 }
562 }
563 PlanNode::Condition {
564 condition: _,
565 if_clause,
566 else_clause,
567 } => {
568 if let Some(node) = if_clause.as_mut() {
569 node.extract_authorization_metadata(schema, key);
570 }
571 if let Some(node) = else_clause.as_mut() {
572 node.extract_authorization_metadata(schema, key);
573 }
574 }
575 }
576 }
577}
578
579#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
581#[serde(rename_all = "camelCase")]
582pub(crate) struct FlattenNode {
583 pub(crate) path: Path,
585
586 pub(crate) node: Box<PlanNode>,
588}
589
590#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
592#[serde(rename_all = "camelCase")]
593pub(crate) struct Primary {
594 pub(crate) subselection: Option<String>,
597
598 pub(crate) node: Option<Box<PlanNode>>,
600}
601
602#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
606#[serde(rename_all = "camelCase")]
607pub(crate) struct DeferredNode {
608 pub(crate) depends: Vec<Depends>,
612
613 pub(crate) label: Option<String>,
615 pub(crate) query_path: Path,
617 pub(crate) subselection: Option<String>,
621 pub(crate) node: Option<Arc<PlanNode>>,
623}
624
625#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
627#[serde(rename_all = "camelCase")]
628pub(crate) struct Depends {
629 pub(crate) id: String,
630}
631
632#[cfg(test)]
633mod test {
634 use crate::query_planner::QueryPlan;
635
636 #[test]
637 fn test_estimated_size() {
638 let query_plan = QueryPlan::fake_builder().build();
639 let size1 = query_plan.estimated_size();
640 let size2 = query_plan.estimated_size();
641 assert!(size1 > 0);
642 assert_eq!(size1, size2);
643 }
644}