1use crate::coordinator::{DistributedExec, MetricsStore};
2use crate::execution_plans::{DistributedLeafExec, NetworkCoalesceExec};
3use crate::metrics::DISTRIBUTED_DATAFUSION_TASK_ID_LABEL;
4use datafusion::common::{HashMap, config_err};
5use datafusion::common::{exec_err, plan_err};
6use datafusion::error::Result;
7use datafusion::execution::{SendableRecordBatchStream, TaskContext};
8use datafusion::physical_plan::display::DisplayableExecutionPlan;
9use datafusion::physical_plan::metrics::{Label, Metric, MetricsSet};
10use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties, displayable};
11use itertools::Either;
12use std::collections::VecDeque;
13use std::sync::Arc;
14use url::Url;
15use uuid::Uuid;
16
17#[derive(Debug, Clone)]
71pub enum Stage {
72 Local(LocalStage),
73 Remote(RemoteStage),
74}
75
76#[derive(Debug, Clone)]
77pub struct LocalStage {
78 pub query_id: Uuid,
80 pub num: usize,
82 pub plan: Arc<dyn ExecutionPlan>,
85 pub tasks: usize,
87}
88
89impl LocalStage {
90 pub fn execute(
91 &self,
92 partition: usize,
93 context: Arc<TaskContext>,
94 ) -> Result<SendableRecordBatchStream> {
95 if self.tasks > 1 {
96 return exec_err!("Cannot execute a local stage with more than 1 task");
97 }
98 self.plan.execute(partition, context)
99 }
100}
101
102#[derive(Debug, Clone)]
103pub struct RemoteStage {
104 pub query_id: Uuid,
106 pub num: usize,
108 pub workers: Vec<Url>,
110}
111
112impl Stage {
113 pub fn query_id(&self) -> Uuid {
114 match &self {
115 Self::Local(v) => v.query_id,
116 Self::Remote(v) => v.query_id,
117 }
118 }
119
120 pub fn num(&self) -> usize {
121 match &self {
122 Self::Local(v) => v.num,
123 Self::Remote(v) => v.num,
124 }
125 }
126
127 pub fn task_count(&self) -> usize {
128 match &self {
129 Self::Local(v) => v.tasks,
130 Self::Remote(v) => v.workers.len(),
131 }
132 }
133
134 pub fn local_plan(&self) -> Option<&Arc<dyn ExecutionPlan>> {
135 match &self {
136 Self::Local(v) => Some(&v.plan),
137 Self::Remote(_) => None,
138 }
139 }
140}
141
142#[derive(Debug, Clone, Copy, PartialEq)]
143pub struct DistributedTaskContext {
144 pub task_index: usize,
145 pub task_count: usize,
146}
147
148impl DistributedTaskContext {
149 pub fn from_ctx(ctx: &Arc<TaskContext>) -> Arc<Self> {
150 ctx.session_config()
151 .get_extension::<Self>()
152 .unwrap_or(Arc::new(DistributedTaskContext {
153 task_index: 0,
154 task_count: 1,
155 }))
156 }
157}
158
159use crate::common::serialize_uuid;
160use crate::metrics::proto::metric_proto_to_df;
161use crate::worker::generated::worker as pb;
162use crate::{DistributedMetricsFormat, NetworkShuffleExec, rewrite_distributed_plan_with_metrics};
163use crate::{NetworkBoundary, NetworkBoundaryExt};
164use datafusion::common::DataFusionError;
165use datafusion::physical_expr::Partitioning;
166use std::fmt::Write;
179
180pub async fn explain_analyze(
182 executed: Arc<dyn ExecutionPlan>,
183 format: DistributedMetricsFormat,
184) -> Result<String, DataFusionError> {
185 match executed.downcast_ref::<DistributedExec>() {
186 None => Ok(DisplayableExecutionPlan::with_metrics(executed.as_ref())
187 .indent(true)
188 .to_string()),
189 Some(_) => {
190 let executed = rewrite_distributed_plan_with_metrics(executed.clone(), format).await?;
191 Ok(display_plan_ascii(executed.as_ref(), true))
192 }
193 }
194}
195
196const LTCORNER: &str = "┌"; const LDCORNER: &str = "└"; const VERTICAL: &str = "│"; const HORIZONTAL: &str = "─"; pub fn display_plan_ascii(plan: &dyn ExecutionPlan, show_metrics: bool) -> String {
202 if let Some(plan) = plan.downcast_ref::<DistributedExec>() {
203 let mut f = String::new();
204 display_ascii(plan, Either::Left(plan), 0, show_metrics, &mut f).unwrap();
205 f
206 } else {
207 match show_metrics {
208 true => DisplayableExecutionPlan::with_metrics(plan)
209 .indent(true)
210 .to_string(),
211 false => displayable(plan).indent(true).to_string(),
212 }
213 }
214}
215
216fn display_ascii(
217 root: &DistributedExec,
218 stage: Either<&DistributedExec, &Stage>,
219 depth: usize,
220 show_metrics: bool,
221 f: &mut String,
222) -> std::fmt::Result {
223 let plan = match stage {
224 Either::Left(distributed_exec) => distributed_exec.children().first().unwrap(),
225 Either::Right(stage) => {
226 let Some(plan) = stage.local_plan() else {
227 return write!(f, "StageExec: encoded input plan");
228 };
229 plan
230 }
231 };
232 match stage {
233 Either::Left(dist_exec) => {
234 write!(
235 f,
236 "{}{}{} DistributedExec {} {}",
237 " ".repeat(depth),
238 LTCORNER,
239 HORIZONTAL.repeat(5),
240 HORIZONTAL.repeat(2),
241 format_tasks_for_stage(1, plan),
242 )?;
243 if show_metrics && let Some(metrics) = dist_exec.metrics() {
244 write!(f, " ")?;
245 writeln!(f, "{}", format_metrics_by_task(&metrics))?;
246 } else {
247 writeln!(f)?;
248 }
249 }
250 Either::Right(stage) => {
251 write!(
252 f,
253 "{}{}{} Stage {} {} {}",
254 " ".repeat(depth),
255 LTCORNER,
256 HORIZONTAL.repeat(5),
257 stage.num(),
258 HORIZONTAL.repeat(2),
259 format_tasks_for_stage(stage.task_count(), plan)
260 )?;
261 if show_metrics && let Some(metrics_store) = &root.metrics_store {
262 let metrics = gather_stage_header_metrics(stage, metrics_store);
263 write!(f, " ")?;
264 writeln!(f, "{}", format_metrics_by_task(&metrics))?;
265 } else {
266 writeln!(f)?;
267 }
268 }
269 }
270
271 let mut plan_str = String::new();
272 display_inner_ascii(plan, 0, show_metrics, &mut plan_str)?;
273 let plan_str = plan_str
274 .split('\n')
275 .filter(|v| !v.is_empty())
276 .collect::<Vec<_>>()
277 .join(&format!("\n{}{}", " ".repeat(depth), VERTICAL));
278 writeln!(f, "{}{}{}", " ".repeat(depth), VERTICAL, plan_str)?;
279 writeln!(
280 f,
281 "{}{}{}",
282 " ".repeat(depth),
283 LDCORNER,
284 HORIZONTAL.repeat(50)
285 )?;
286 for input_stage in find_input_stages(plan.as_ref()) {
287 display_ascii(root, Either::Right(input_stage), depth + 1, show_metrics, f)?;
288 }
289 Ok(())
290}
291
292fn display_inner_ascii(
293 plan: &Arc<dyn ExecutionPlan>,
294 indent: usize,
295 show_metrics: bool,
296 f: &mut String,
297) -> std::fmt::Result {
298 if plan.is::<DistributedLeafExec>() {
299 return display_inner_distributed_leaf(plan, indent, show_metrics, f);
300 }
301
302 let node_str = displayable(plan.as_ref()).one_line().to_string();
303 let metrics_str = match show_metrics {
304 true => metrics_suffix(plan.metrics().map(|m| format_metrics_by_task(&m))),
305 false => String::new(),
306 };
307 writeln!(
308 f,
309 "{} {}{metrics_str}",
310 " ".repeat(indent),
311 node_str.trim_end() )?;
313
314 if plan.is_network_boundary() {
315 return Ok(());
316 }
317
318 for child in plan.children() {
319 display_inner_ascii(child, indent + 2, show_metrics, f)?;
320 }
321 Ok(())
322}
323
324fn display_inner_distributed_leaf(
325 plan: &Arc<dyn ExecutionPlan>,
326 indent: usize,
327 show_metrics: bool,
328 f: &mut String,
329) -> std::fmt::Result {
330 let Some(leaf) = plan.downcast_ref::<DistributedLeafExec>() else {
331 return Ok(());
332 };
333 let indent = " ".repeat(indent);
334
335 if let Some(by_task) = show_metrics
340 .then(|| plan.metrics())
341 .flatten()
342 .map(|m| metrics_by_task_id(&m))
343 && !by_task.is_empty()
344 {
345 writeln!(f, "{indent} DistributedLeafExec:")?;
346 for (task_i, variant) in leaf.variants.iter().enumerate() {
347 let variant = displayable(variant.as_ref()).one_line().to_string();
348 let metrics = match by_task.is_empty() {
349 true => String::new(),
350 false => metrics_suffix(by_task.get(&task_i).map(format_metrics_by_task)),
351 };
352 writeln!(f, "{indent} t{task_i}: {}{metrics}", variant.trim_end())?;
353 }
354 } else {
355 let header = match show_metrics {
356 true => metrics_suffix(plan.metrics().map(|m| format_metrics_by_task(&m))),
357 false => String::new(),
358 };
359 writeln!(f, "{indent} DistributedLeafExec:{header}")?;
360 for (task_i, variant) in leaf.variants.iter().enumerate() {
361 let variant = displayable(variant.as_ref()).one_line().to_string();
362 writeln!(f, "{indent} t{task_i}: {}", variant.trim_end())?;
363 }
364 }
365 Ok(())
366}
367
368fn gather_stage_header_metrics(stage: &Stage, metrics_store: &MetricsStore) -> MetricsSet {
371 let mut task_key = pb::TaskKey {
372 query_id: serialize_uuid(&stage.query_id()),
373 stage_id: stage.num() as u64,
374 task_number: 0,
375 };
376 let mut all_metrics = MetricsSet::new();
377 while let Some(metrics_set) = metrics_store.get(&task_key).and_then(|v| v.task_metrics) {
378 for mut metric in metrics_set.metrics {
379 metric.labels.push(pb::Label {
380 name: DISTRIBUTED_DATAFUSION_TASK_ID_LABEL.to_string(),
381 value: task_key.task_number.to_string(),
382 });
383 if let Ok(metric) = metric_proto_to_df(metric) {
384 all_metrics.push(metric)
385 };
386 }
387 task_key.task_number += 1;
388 }
389 all_metrics
390}
391
392fn aggregate_by_task_id(metrics: &MetricsSet) -> MetricsSet {
399 let mut map: HashMap<(String, Option<String>), Metric> = HashMap::new();
401
402 for metric in metrics.iter() {
403 let name = metric.value().name().to_string();
404 let task_id = metric
405 .labels()
406 .iter()
407 .find(|l| l.name() == DISTRIBUTED_DATAFUSION_TASK_ID_LABEL)
408 .map(|l| l.value().to_string());
409
410 let key = (name, task_id.clone());
411
412 map.entry(key)
413 .and_modify(|accum| {
414 accum.value_mut().aggregate(metric.value());
415 })
416 .or_insert_with(|| {
417 let labels = task_id
418 .map(|id| vec![Label::new(DISTRIBUTED_DATAFUSION_TASK_ID_LABEL, id)])
419 .unwrap_or_default();
420 let mut accum = Metric::new_with_labels(
421 metric.value().new_empty(),
422 None, labels,
424 );
425 accum.value_mut().aggregate(metric.value());
426 accum
427 });
428 }
429
430 let mut result = MetricsSet::new();
431 for (_, metric) in map {
432 result.push(Arc::new(metric));
433 }
434 result
435}
436
437fn sorted_for_display_by_task_id(metrics: MetricsSet) -> MetricsSet {
443 let mut vec: Vec<Arc<Metric>> = metrics.iter().cloned().collect();
444 vec.sort_unstable_by_key(|metric| {
445 let task_id = metric
446 .labels()
447 .iter()
448 .find(|l| l.name() == DISTRIBUTED_DATAFUSION_TASK_ID_LABEL)
449 .and_then(|l| l.value().parse::<u64>().ok());
450 (
451 metric.value().display_sort_key(),
452 metric.value().name().to_owned(),
453 task_id,
454 )
455 });
456 let mut result = MetricsSet::new();
457 for m in vec {
458 result.push(m);
459 }
460 result
461}
462
463fn format_metrics_by_task(metrics: &MetricsSet) -> String {
473 let aggregated = aggregate_by_task_id(metrics);
474 let sorted = sorted_for_display_by_task_id(aggregated).timestamps_removed();
475
476 sorted
477 .iter()
478 .map(|m| {
479 let name = m.value().name();
480 let task_id = m
481 .labels()
482 .iter()
483 .find(|l| l.name() == DISTRIBUTED_DATAFUSION_TASK_ID_LABEL)
484 .map(|l| l.value());
485
486 match task_id {
487 Some(id) => format!("{name}_{id}={}", m.value()),
488 None => format!("{name}={}", m.value()),
489 }
490 })
491 .collect::<Vec<_>>()
492 .join(", ")
493}
494
495fn metrics_suffix(formatted: Option<String>) -> String {
498 match formatted.unwrap_or_default() {
499 s if s.is_empty() => ", metrics=[]".to_string(),
500 s => format!(", metrics=[{s}]"),
501 }
502}
503
504fn metrics_by_task_id(metrics: &MetricsSet) -> HashMap<usize, MetricsSet> {
508 let mut map: HashMap<usize, MetricsSet> = HashMap::new();
509 for metric in metrics.iter() {
510 let Some(task_id) = metric
511 .labels()
512 .iter()
513 .find(|l| l.name() == DISTRIBUTED_DATAFUSION_TASK_ID_LABEL)
514 .and_then(|l| l.value().parse::<usize>().ok())
515 else {
516 continue;
517 };
518 map.entry(task_id).or_default().push(Arc::clone(metric));
519 }
520 map
521}
522
523fn format_tasks_for_stage(n_tasks: usize, head: &Arc<dyn ExecutionPlan>) -> String {
524 let partitioning = head.properties().output_partitioning();
525 let input_partitions = partitioning.partition_count();
526 let hash_shuffle = matches!(partitioning, Partitioning::Hash(_, _));
527 let mut tasks = Vec::with_capacity(n_tasks);
528 let mut off = 0;
529 for i in 0..n_tasks {
530 let end = off + input_partitions - 1;
531 let partitions = if input_partitions == 1 {
532 format!("p{off}")
533 } else {
534 format!("p{off}..p{end}")
535 };
536 tasks.push(format!("t{i}:[{partitions}]"));
537 off += if hash_shuffle { 0 } else { input_partitions }
538 }
539 format!("Tasks: {}", tasks.join(" "))
540}
541
542const NUM_COLORS: usize = 6;
545const COLOR_SCHEME: &str = "spectral6";
546
547pub fn display_plan_graphviz(plan: Arc<dyn ExecutionPlan>) -> Result<String> {
554 let mut f = String::new();
555
556 writeln!(
557 f,
558 "digraph G {{
559 rankdir=BT
560 edge[colorscheme={COLOR_SCHEME}, penwidth=2.0]
561 splines=false
562"
563 )?;
564
565 if plan.is::<DistributedExec>() {
566 let mut max_num = 0;
567 let mut all_stages = find_all_stages(&plan)
568 .into_iter()
569 .inspect(|v| max_num = max_num.max(v.num()))
570 .collect::<Vec<_>>();
571 let head_stage = Stage::Local(LocalStage {
572 query_id: Default::default(),
573 num: max_num + 1,
574 plan: plan.clone(),
575 tasks: 1,
576 });
577 all_stages.insert(0, &head_stage);
578
579 for stage in &all_stages {
581 for i in 0..stage.task_count() {
582 let p = display_single_task(stage, i)?;
583 writeln!(f, "{p}")?;
584 }
585 }
586 for stage in &all_stages {
588 let Some(plan) = stage.local_plan() else {
589 continue;
590 };
591 for input_stage in find_input_stages(plan.as_ref()) {
592 for task_i in 0..stage.task_count() {
593 for input_task_i in 0..input_stage.task_count() {
594 let edges =
595 display_inter_task_edges(stage, task_i, input_stage, input_task_i)?;
596 writeln!(
597 f,
598 "// edges from child stage {} task {} to stage {} task {}\n {}",
599 input_stage.num(),
600 input_task_i,
601 stage.num(),
602 task_i,
603 edges
604 )?;
605 }
606 }
607 }
608 }
609 } else {
610 writeln!(f, "node[shape=none]")?;
612 let p = display_plan(&plan, 0, 1, 0)?;
613 writeln!(f, "{p}")?;
614 }
615
616 writeln!(f, "}}")?;
617
618 Ok(f)
619}
620
621fn display_single_task(stage: &Stage, task_i: usize) -> Result<String> {
622 let Some(plan) = stage.local_plan() else {
623 return config_err!("plan not present");
624 };
625 let partition_group =
626 build_partition_group(task_i, plan.output_partitioning().partition_count());
627
628 let mut f = String::new();
629 writeln!(
630 f,
631 "
632 subgraph \"cluster_stage_{}_task_{}_margin\" {{
633 style=invis
634 margin=20.0
635 subgraph \"cluster_stage_{}_task_{}\" {{
636 color=blue
637 style=dotted
638 label = \"Stage {} Task {} Partitions {}\"
639 labeljust=r
640 labelloc=b
641
642 node[shape=none]
643
644",
645 stage.num(),
646 task_i,
647 stage.num(),
648 task_i,
649 stage.num(),
650 task_i,
651 format_pg(&partition_group)
652 )?;
653
654 writeln!(
655 f,
656 "{}",
657 display_plan(plan, task_i, stage.task_count(), stage.num())?
658 )?;
659 writeln!(f, " }}")?;
660 writeln!(f, " }}")?;
661
662 Ok(f)
663}
664
665fn display_plan(
666 plan: &Arc<dyn ExecutionPlan>,
667 task_i: usize,
668 _n_tasks: usize,
669 stage_num: usize,
670) -> Result<String> {
671 let mut queue = VecDeque::from([plan]);
676 let mut node_index = 0;
677
678 let mut f = String::new();
679 while let Some(plan) = queue.pop_front() {
680 node_index += 1;
681 let p = display_single_plan(plan.as_ref(), stage_num, task_i, node_index)?;
682 writeln!(f, "{p}")?;
683
684 if plan.is_network_boundary() {
685 continue;
686 }
687 for child in plan.children().iter() {
688 queue.push_back(child);
689 }
690 }
691
692 type PlanWithParent<'a> = (
694 &'a Arc<dyn ExecutionPlan>,
695 Option<&'a Arc<dyn ExecutionPlan>>,
696 usize,
697 );
698 let mut queue: VecDeque<PlanWithParent> = VecDeque::from([(plan, None, 0usize)]);
699 node_index = 0;
700 while let Some((plan, maybe_parent, parent_idx)) = queue.pop_front() {
701 node_index += 1;
702 if let Some(parent) = maybe_parent {
703 let output_partitions = plan.output_partitioning().partition_count();
704
705 for i in 0..output_partitions {
706 let style = "";
707
708 writeln!(
709 f,
710 " {}_{}_{}_{}:t{}:n -> {}_{}_{}_{}:b{}:s {}[color={}]",
711 plan.name(),
712 stage_num,
713 task_i,
714 node_index,
715 i,
716 parent.name(),
717 stage_num,
718 task_i,
719 parent_idx,
720 i,
721 style,
722 i % NUM_COLORS + 1
723 )?;
724 }
725 }
726
727 if plan.as_ref().is_network_boundary() {
728 continue;
729 }
730
731 for child in plan.children() {
732 queue.push_back((child, Some(plan), node_index));
733 }
734 }
735 Ok(f)
736}
737
738pub fn display_single_plan(
782 plan: &(dyn ExecutionPlan + 'static),
783 stage_num: usize,
784 task_i: usize,
785 node_index: usize,
786) -> Result<String> {
787 let mut f = String::new();
788 let output_partitions = plan.output_partitioning().partition_count();
789 let input_partitions = if plan.is_network_boundary() {
790 output_partitions
791 } else if let Some(child) = plan.children().first() {
792 child.output_partitioning().partition_count()
793 } else {
794 1
795 };
796
797 writeln!(
798 f,
799 "
800 {}_{}_{}_{} [label=<
801 <TABLE BORDER='0' CELLBORDER='0' CELLSPACING='0' CELLPADDING='0'>
802 <TR>
803 <TD CELLBORDER='0'>
804 <TABLE BORDER='0' CELLBORDER='1' CELLSPACING='0'>
805 <TR>",
806 plan.name(),
807 stage_num,
808 task_i,
809 node_index
810 )?;
811
812 for i in 0..output_partitions {
813 writeln!(f, " <TD PORT='t{i}'></TD>")?;
814 }
815
816 writeln!(
817 f,
818 " </TR>
819 </TABLE>
820 </TD>
821 </TR>
822 <TR>
823 <TD BORDER='0' CELLPADDING='0' CELLSPACING='0'>
824 <TABLE BORDER='0' CELLBORDER='1' CELLSPACING='0'>
825 <TR>
826 <TD>{}</TD>
827 </TR>
828 </TABLE>
829 </TD>
830 </TR>
831 <TR>
832 <TD CELLBORDER='0'>
833 <TABLE BORDER='0' CELLBORDER='1' CELLSPACING='0'>
834 <TR>",
835 plan.name()
836 )?;
837
838 for i in 0..input_partitions {
839 writeln!(f, " <TD PORT='b{i}'></TD>")?;
840 }
841
842 writeln!(
843 f,
844 " </TR>
845 </TABLE>
846 </TD>
847 </TR>
848 </TABLE>
849 >];
850"
851 )?;
852 Ok(f)
853}
854
855fn display_inter_task_edges(
856 stage: &Stage,
857 task_i: usize,
858 input_stage: &Stage,
859 input_task_i: usize,
860) -> Result<String> {
861 let Some(plan) = stage.local_plan() else {
862 return plan_err!("The inner plan of a stage was encoded.");
863 };
864 let Some(input_plan) = input_stage.local_plan() else {
865 return plan_err!("The inner plan of a stage was encoded.");
866 };
867 let mut f = String::new();
868
869 let mut queue = VecDeque::from([plan]);
870 let mut index = 0;
871 while let Some(plan) = queue.pop_front() {
872 index += 1;
873 if let Some(node) = plan.downcast_ref::<NetworkShuffleExec>() {
874 if node.input_stage().num() != input_stage.num() {
875 continue;
876 }
877 let output_partitions = plan.output_partitioning().partition_count();
879 for p in 0..output_partitions {
880 writeln!(
881 f,
882 " {}_{}_{}_{}:t{}:n -> {}_{}_{}_{}:b{}:s [color={}]",
883 input_plan.name(),
884 input_stage.num(),
885 input_task_i,
886 1, p + (task_i * output_partitions),
888 plan.name(),
889 stage.num(),
890 task_i,
891 index,
892 p,
893 p % NUM_COLORS + 1
894 )?;
895 }
896 continue;
897 } else if let Some(node) = plan.downcast_ref::<NetworkCoalesceExec>() {
898 if node.input_stage().num() != input_stage.num() {
899 continue;
900 }
901 let output_partitions = plan.output_partitioning().partition_count();
903 let input_partitions_per_task = output_partitions / input_stage.task_count();
904 for p in 0..input_partitions_per_task {
905 writeln!(
906 f,
907 " {}_{}_{}_{}:t{}:n -> {}_{}_{}_{}:b{}:s [color={}]",
908 input_plan.name(),
909 input_stage.num(),
910 input_task_i,
911 1, p,
913 plan.name(),
914 stage.num(),
915 task_i,
916 index,
917 p + (input_task_i * input_partitions_per_task),
918 p % NUM_COLORS + 1
919 )?;
920 }
921 continue;
922 }
923
924 for child in plan.children() {
925 queue.push_back(child);
926 }
927 }
928
929 Ok(f)
930}
931
932fn format_pg(partition_group: &[usize]) -> String {
933 partition_group
934 .iter()
935 .map(|pg| format!("{pg}"))
936 .collect::<Vec<_>>()
937 .join("_")
938}
939
940fn build_partition_group(task_i: usize, partitions: usize) -> Vec<usize> {
941 ((task_i * partitions)..((task_i + 1) * partitions)).collect::<Vec<_>>()
942}
943
944fn find_input_stages(plan: &dyn ExecutionPlan) -> Vec<&Stage> {
945 let mut result = vec![];
946 for child in plan.children() {
947 if let Some(plan) = child.as_network_boundary() {
948 result.push(plan.input_stage());
949 } else {
950 result.extend(find_input_stages(child.as_ref()));
951 }
952 }
953 result
954}
955
956pub(crate) fn find_all_stages(plan: &Arc<dyn ExecutionPlan>) -> Vec<&Stage> {
957 let mut result = vec![];
958 if let Some(plan) = plan.as_network_boundary() {
959 result.push(plan.input_stage());
960 }
961 for child in plan.children() {
962 result.extend(find_all_stages(child));
963 }
964 result
965}