Skip to main content

datafusion_distributed/
stage.rs

1use crate::coordinator::{DistributedExec, MetricsStore};
2use crate::execution_plans::{DistributedLeafExec, NetworkCoalesceExec};
3use crate::metrics::DISTRIBUTED_DATAFUSION_TASK_ID_LABEL;
4use datafusion::common::{HashMap, config_err};
5use datafusion::common::{exec_err, plan_err};
6use datafusion::error::Result;
7use datafusion::execution::{SendableRecordBatchStream, TaskContext};
8use datafusion::physical_plan::display::DisplayableExecutionPlan;
9use datafusion::physical_plan::metrics::{Label, Metric, MetricsSet};
10use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties, displayable};
11use itertools::Either;
12use std::collections::VecDeque;
13use std::sync::Arc;
14use url::Url;
15use uuid::Uuid;
16
17/// A unit of isolation for a portion of a physical execution plan
18/// that can be executed independently and across a network boundary.
19/// It implements [`ExecutionPlan`] and can be executed to produce a
20/// stream of record batches.
21///
22/// If a stage has input stages, then those input stages will be executed on remote resources
23/// and will be provided the remainder of the stage tree.
24///
25/// For example, if our stage tree looks like this:
26///
27/// ```text
28///                       ┌─────────┐
29///                       │ stage 1 │
30///                       └───┬─────┘
31///                           │
32///                    ┌──────┴────────┐
33///               ┌────┴────┐     ┌────┴────┐
34///               │ stage 2 │     │ stage 3 │
35///               └────┬────┘     └─────────┘
36///                    │
37///             ┌──────┴────────┐
38///        ┌────┴────┐     ┌────┴────┐
39///        │ stage 4 │     │ Stage 5 │
40///        └─────────┘     └─────────┘
41///
42/// ```
43///
44/// Then executing Stage 1 will run its plan locally. Stage 1 has two inputs, Stage 2 and Stage 3. We
45/// know these will execute on remote resources. As such, the plan for Stage 1 must contain a
46/// [`NetworkShuffleExec`] node that will read the results of Stage 2 and Stage 3 and coalesce the
47/// results.
48///
49/// When Stage 1's [`NetworkShuffleExec`] node is executed, it makes an ArrowFlightRequest to the
50/// host assigned in the Stage. It provides the following Stage tree serialized in the body of the
51/// Arrow Flight Ticket:
52///
53/// ```text
54///               ┌─────────┐
55///               │ Stage 2 │
56///               └────┬────┘
57///                    │
58///             ┌──────┴────────┐
59///        ┌────┴────┐     ┌────┴────┐
60///        │ Stage 4 │     │ Stage 5 │
61///        └─────────┘     └─────────┘
62///
63/// ```
64///
65/// The receiving Worker will then execute Stage 2 and will repeat this process.
66///
67/// When Stage 4 is executed, it has no input tasks, so it is assumed that the plan included in that
68/// Stage can complete on its own; it's likely holding a leaf node in the overall physical plan and
69/// producing data from a [`DataSourceExec`].
70#[derive(Debug, Clone)]
71pub enum Stage {
72    Local(LocalStage),
73    Remote(RemoteStage),
74}
75
76#[derive(Debug, Clone)]
77pub struct LocalStage {
78    /// Our query_id
79    pub query_id: Uuid,
80    /// Our stage number
81    pub num: usize,
82    /// The physical execution plan that this stage will execute. It will only be present if
83    /// accessing to it through the coordinating stage.
84    pub plan: Arc<dyn ExecutionPlan>,
85    /// The number of tasks the stage has.
86    pub tasks: usize,
87}
88
89impl LocalStage {
90    pub fn execute(
91        &self,
92        partition: usize,
93        context: Arc<TaskContext>,
94    ) -> Result<SendableRecordBatchStream> {
95        if self.tasks > 1 {
96            return exec_err!("Cannot execute a local stage with more than 1 task");
97        }
98        self.plan.execute(partition, context)
99    }
100}
101
102#[derive(Debug, Clone)]
103pub struct RemoteStage {
104    /// Our query_id
105    pub query_id: Uuid,
106    /// Our stage number
107    pub num: usize,
108    /// The worker URLs to which queries should be issued.
109    pub workers: Vec<Url>,
110}
111
112impl Stage {
113    pub fn query_id(&self) -> Uuid {
114        match &self {
115            Self::Local(v) => v.query_id,
116            Self::Remote(v) => v.query_id,
117        }
118    }
119
120    pub fn num(&self) -> usize {
121        match &self {
122            Self::Local(v) => v.num,
123            Self::Remote(v) => v.num,
124        }
125    }
126
127    pub fn task_count(&self) -> usize {
128        match &self {
129            Self::Local(v) => v.tasks,
130            Self::Remote(v) => v.workers.len(),
131        }
132    }
133
134    pub fn local_plan(&self) -> Option<&Arc<dyn ExecutionPlan>> {
135        match &self {
136            Self::Local(v) => Some(&v.plan),
137            Self::Remote(_) => None,
138        }
139    }
140}
141
142#[derive(Debug, Clone, Copy, PartialEq)]
143pub struct DistributedTaskContext {
144    pub task_index: usize,
145    pub task_count: usize,
146}
147
148impl DistributedTaskContext {
149    pub fn from_ctx(ctx: &Arc<TaskContext>) -> Arc<Self> {
150        ctx.session_config()
151            .get_extension::<Self>()
152            .unwrap_or(Arc::new(DistributedTaskContext {
153                task_index: 0,
154                task_count: 1,
155            }))
156    }
157}
158
159use crate::common::serialize_uuid;
160use crate::metrics::proto::metric_proto_to_df;
161use crate::worker::generated::worker as pb;
162use crate::{DistributedMetricsFormat, NetworkShuffleExec, rewrite_distributed_plan_with_metrics};
163use crate::{NetworkBoundary, NetworkBoundaryExt};
164use datafusion::common::DataFusionError;
165use datafusion::physical_expr::Partitioning;
166/// Be able to display a nice tree for stages.
167///
168/// The challenge to doing this at the moment is that `TreeRenderVisitor`
169/// in [`datafusion::physical_plan::display`] is not public, and that it also
170/// is specific to an `ExecutionPlan` trait object, which we don't have.
171///
172/// TODO: try to upstream a change to make rendering of Trees (logical, physical, stages) against
173/// a generic trait rather than a specific trait object. This would allow us to
174/// use the same rendering code for all trees, including stages.
175///
176/// In the meantime, we can make a dummy ExecutionPlan that will let us render
177/// the Stage tree.
178use std::fmt::Write;
179
180/// explain_analyze renders an [ExecutionPlan] with metrics.
181pub async fn explain_analyze(
182    executed: Arc<dyn ExecutionPlan>,
183    format: DistributedMetricsFormat,
184) -> Result<String, DataFusionError> {
185    match executed.downcast_ref::<DistributedExec>() {
186        None => Ok(DisplayableExecutionPlan::with_metrics(executed.as_ref())
187            .indent(true)
188            .to_string()),
189        Some(_) => {
190            let executed = rewrite_distributed_plan_with_metrics(executed.clone(), format).await?;
191            Ok(display_plan_ascii(executed.as_ref(), true))
192        }
193    }
194}
195
196// Unicode box-drawing characters for creating borders and connections.
197const LTCORNER: &str = "┌"; // Left top corner
198const LDCORNER: &str = "└"; // Left bottom corner
199const VERTICAL: &str = "│"; // Vertical line
200const HORIZONTAL: &str = "─"; // Horizontal line
201pub fn display_plan_ascii(plan: &dyn ExecutionPlan, show_metrics: bool) -> String {
202    if let Some(plan) = plan.downcast_ref::<DistributedExec>() {
203        let mut f = String::new();
204        display_ascii(plan, Either::Left(plan), 0, show_metrics, &mut f).unwrap();
205        f
206    } else {
207        match show_metrics {
208            true => DisplayableExecutionPlan::with_metrics(plan)
209                .indent(true)
210                .to_string(),
211            false => displayable(plan).indent(true).to_string(),
212        }
213    }
214}
215
216fn display_ascii(
217    root: &DistributedExec,
218    stage: Either<&DistributedExec, &Stage>,
219    depth: usize,
220    show_metrics: bool,
221    f: &mut String,
222) -> std::fmt::Result {
223    let plan = match stage {
224        Either::Left(distributed_exec) => distributed_exec.children().first().unwrap(),
225        Either::Right(stage) => {
226            let Some(plan) = stage.local_plan() else {
227                return write!(f, "StageExec: encoded input plan");
228            };
229            plan
230        }
231    };
232    match stage {
233        Either::Left(dist_exec) => {
234            write!(
235                f,
236                "{}{}{} DistributedExec {} {}",
237                "  ".repeat(depth),
238                LTCORNER,
239                HORIZONTAL.repeat(5),
240                HORIZONTAL.repeat(2),
241                format_tasks_for_stage(1, plan),
242            )?;
243            if show_metrics && let Some(metrics) = dist_exec.metrics() {
244                write!(f, " ")?;
245                writeln!(f, "{}", format_metrics_by_task(&metrics))?;
246            } else {
247                writeln!(f)?;
248            }
249        }
250        Either::Right(stage) => {
251            write!(
252                f,
253                "{}{}{} Stage {} {} {}",
254                "  ".repeat(depth),
255                LTCORNER,
256                HORIZONTAL.repeat(5),
257                stage.num(),
258                HORIZONTAL.repeat(2),
259                format_tasks_for_stage(stage.task_count(), plan)
260            )?;
261            if show_metrics && let Some(metrics_store) = &root.metrics_store {
262                let metrics = gather_stage_header_metrics(stage, metrics_store);
263                write!(f, " ")?;
264                writeln!(f, "{}", format_metrics_by_task(&metrics))?;
265            } else {
266                writeln!(f)?;
267            }
268        }
269    }
270
271    let mut plan_str = String::new();
272    display_inner_ascii(plan, 0, show_metrics, &mut plan_str)?;
273    let plan_str = plan_str
274        .split('\n')
275        .filter(|v| !v.is_empty())
276        .collect::<Vec<_>>()
277        .join(&format!("\n{}{}", "  ".repeat(depth), VERTICAL));
278    writeln!(f, "{}{}{}", "  ".repeat(depth), VERTICAL, plan_str)?;
279    writeln!(
280        f,
281        "{}{}{}",
282        "  ".repeat(depth),
283        LDCORNER,
284        HORIZONTAL.repeat(50)
285    )?;
286    for input_stage in find_input_stages(plan.as_ref()) {
287        display_ascii(root, Either::Right(input_stage), depth + 1, show_metrics, f)?;
288    }
289    Ok(())
290}
291
292fn display_inner_ascii(
293    plan: &Arc<dyn ExecutionPlan>,
294    indent: usize,
295    show_metrics: bool,
296    f: &mut String,
297) -> std::fmt::Result {
298    if plan.is::<DistributedLeafExec>() {
299        return display_inner_distributed_leaf(plan, indent, show_metrics, f);
300    }
301
302    let node_str = displayable(plan.as_ref()).one_line().to_string();
303    let metrics_str = match show_metrics {
304        true => metrics_suffix(plan.metrics().map(|m| format_metrics_by_task(&m))),
305        false => String::new(),
306    };
307    writeln!(
308        f,
309        "{} {}{metrics_str}",
310        " ".repeat(indent),
311        node_str.trim_end() // remove trailing newline
312    )?;
313
314    if plan.is_network_boundary() {
315        return Ok(());
316    }
317
318    for child in plan.children() {
319        display_inner_ascii(child, indent + 2, show_metrics, f)?;
320    }
321    Ok(())
322}
323
324fn display_inner_distributed_leaf(
325    plan: &Arc<dyn ExecutionPlan>,
326    indent: usize,
327    show_metrics: bool,
328    f: &mut String,
329) -> std::fmt::Result {
330    let Some(leaf) = plan.downcast_ref::<DistributedLeafExec>() else {
331        return Ok(());
332    };
333    let indent = " ".repeat(indent);
334
335    // The leaf node is wrapped in a `MetricsWrapperExec` by the metrics rewriter, so the
336    // per-task metrics live on `plan.metrics()` (the wrapper), not on `leaf.metrics()` (which
337    // delegates to the un-rewritten original). Split them by task id to show each variant's
338    // own metrics.
339    if let Some(by_task) = show_metrics
340        .then(|| plan.metrics())
341        .flatten()
342        .map(|m| metrics_by_task_id(&m))
343        && !by_task.is_empty()
344    {
345        writeln!(f, "{indent} DistributedLeafExec:")?;
346        for (task_i, variant) in leaf.variants.iter().enumerate() {
347            let variant = displayable(variant.as_ref()).one_line().to_string();
348            let metrics = match by_task.is_empty() {
349                true => String::new(),
350                false => metrics_suffix(by_task.get(&task_i).map(format_metrics_by_task)),
351            };
352            writeln!(f, "{indent}   t{task_i}: {}{metrics}", variant.trim_end())?;
353        }
354    } else {
355        let header = match show_metrics {
356            true => metrics_suffix(plan.metrics().map(|m| format_metrics_by_task(&m))),
357            false => String::new(),
358        };
359        writeln!(f, "{indent} DistributedLeafExec:{header}")?;
360        for (task_i, variant) in leaf.variants.iter().enumerate() {
361            let variant = displayable(variant.as_ref()).one_line().to_string();
362            writeln!(f, "{indent}   t{task_i}: {}", variant.trim_end())?;
363        }
364    }
365    Ok(())
366}
367
368/// Gathers the metrics global to a stage. These metrics are not specific to any plan node, and
369/// are instead global to a whole stage.
370fn gather_stage_header_metrics(stage: &Stage, metrics_store: &MetricsStore) -> MetricsSet {
371    let mut task_key = pb::TaskKey {
372        query_id: serialize_uuid(&stage.query_id()),
373        stage_id: stage.num() as u64,
374        task_number: 0,
375    };
376    let mut all_metrics = MetricsSet::new();
377    while let Some(metrics_set) = metrics_store.get(&task_key).and_then(|v| v.task_metrics) {
378        for mut metric in metrics_set.metrics {
379            metric.labels.push(pb::Label {
380                name: DISTRIBUTED_DATAFUSION_TASK_ID_LABEL.to_string(),
381                value: task_key.task_number.to_string(),
382            });
383            if let Ok(metric) = metric_proto_to_df(metric) {
384                all_metrics.push(metric)
385            };
386        }
387        task_key.task_number += 1;
388    }
389    all_metrics
390}
391
392/// Aggregates metrics by (name, task_id), preserving the [DISTRIBUTED_DATAFUSION_TASK_ID_LABEL]
393/// only. Metrics without a task_id label (ie. non distributed metrics) are aggregated together.
394///
395/// For a non-distributed plan, this is equivalent to [MetricsSet::aggregate_by_name] since there
396/// will be no task ids. For a distributed plan, it's expected that the metrics rewriter populated
397/// task id labels in all metrics.
398fn aggregate_by_task_id(metrics: &MetricsSet) -> MetricsSet {
399    // Key: (metric_name, Option<task_id>)
400    let mut map: HashMap<(String, Option<String>), Metric> = HashMap::new();
401
402    for metric in metrics.iter() {
403        let name = metric.value().name().to_string();
404        let task_id = metric
405            .labels()
406            .iter()
407            .find(|l| l.name() == DISTRIBUTED_DATAFUSION_TASK_ID_LABEL)
408            .map(|l| l.value().to_string());
409
410        let key = (name, task_id.clone());
411
412        map.entry(key)
413            .and_modify(|accum| {
414                accum.value_mut().aggregate(metric.value());
415            })
416            .or_insert_with(|| {
417                let labels = task_id
418                    .map(|id| vec![Label::new(DISTRIBUTED_DATAFUSION_TASK_ID_LABEL, id)])
419                    .unwrap_or_default();
420                let mut accum = Metric::new_with_labels(
421                    metric.value().new_empty(),
422                    None, // no partition
423                    labels,
424                );
425                accum.value_mut().aggregate(metric.value());
426                accum
427            });
428    }
429
430    let mut result = MetricsSet::new();
431    for (_, metric) in map {
432        result.push(Arc::new(metric));
433    }
434    result
435}
436
437/// Sorts metrics by display priority, then name, then by task_id (numerically).
438///
439/// For a non-distributed plan, this is equivalent to [MetricsSet::sorted_for_display] since there
440/// will be no task ids. For a distributed plan, it's expected that the metrics rewriter populated
441/// task id labels in all metrics.
442fn sorted_for_display_by_task_id(metrics: MetricsSet) -> MetricsSet {
443    let mut vec: Vec<Arc<Metric>> = metrics.iter().cloned().collect();
444    vec.sort_unstable_by_key(|metric| {
445        let task_id = metric
446            .labels()
447            .iter()
448            .find(|l| l.name() == DISTRIBUTED_DATAFUSION_TASK_ID_LABEL)
449            .and_then(|l| l.value().parse::<u64>().ok());
450        (
451            metric.value().display_sort_key(),
452            metric.value().name().to_owned(),
453            task_id,
454        )
455    });
456    let mut result = MetricsSet::new();
457    for m in vec {
458        result.push(m);
459    }
460    result
461}
462
463/// Formats metrics as "{metric_name}_{task_id}={value}, {metric_name}_{task_id}={value}"
464/// e.g., "output_rows_0=100, output_rows_1=150, elapsed_compute_0=50ns, elapsed_compute_1=100ns"
465///
466/// For a non-distributed plan, this is equivalent to using [ShowMetrics::Aggregated] /
467/// [DisplayableExecutionPlan::with_metrics] which aggregates, sorts, removes timestamps, and finally formats
468/// the metrics.
469///
470/// See
471/// https://github.com/apache/datafusion/blob/b463a9f9e3c9603eb2db7113125fea3a1b7f5455/datafusion/physical-plan/src/display.rs#L421.
472fn format_metrics_by_task(metrics: &MetricsSet) -> String {
473    let aggregated = aggregate_by_task_id(metrics);
474    let sorted = sorted_for_display_by_task_id(aggregated).timestamps_removed();
475
476    sorted
477        .iter()
478        .map(|m| {
479            let name = m.value().name();
480            let task_id = m
481                .labels()
482                .iter()
483                .find(|l| l.name() == DISTRIBUTED_DATAFUSION_TASK_ID_LABEL)
484                .map(|l| l.value());
485
486            match task_id {
487                Some(id) => format!("{name}_{id}={}", m.value()),
488                None => format!("{name}={}", m.value()),
489            }
490        })
491        .collect::<Vec<_>>()
492        .join(", ")
493}
494
495/// Wraps a formatted metrics string into the `, metrics=[...]` suffix used in plan displays.
496/// A missing or empty value renders as `, metrics=[]`.
497fn metrics_suffix(formatted: Option<String>) -> String {
498    match formatted.unwrap_or_default() {
499        s if s.is_empty() => ", metrics=[]".to_string(),
500        s => format!(", metrics=[{s}]"),
501    }
502}
503
504/// Splits a [MetricsSet] into a map from task index to the metrics belonging to that task.
505/// Only metrics that carry a [DISTRIBUTED_DATAFUSION_TASK_ID_LABEL] are included; metrics without
506/// that label are dropped. Returns an empty map when no task-labelled metrics are present.
507fn metrics_by_task_id(metrics: &MetricsSet) -> HashMap<usize, MetricsSet> {
508    let mut map: HashMap<usize, MetricsSet> = HashMap::new();
509    for metric in metrics.iter() {
510        let Some(task_id) = metric
511            .labels()
512            .iter()
513            .find(|l| l.name() == DISTRIBUTED_DATAFUSION_TASK_ID_LABEL)
514            .and_then(|l| l.value().parse::<usize>().ok())
515        else {
516            continue;
517        };
518        map.entry(task_id).or_default().push(Arc::clone(metric));
519    }
520    map
521}
522
523fn format_tasks_for_stage(n_tasks: usize, head: &Arc<dyn ExecutionPlan>) -> String {
524    let partitioning = head.properties().output_partitioning();
525    let input_partitions = partitioning.partition_count();
526    let hash_shuffle = matches!(partitioning, Partitioning::Hash(_, _));
527    let mut tasks = Vec::with_capacity(n_tasks);
528    let mut off = 0;
529    for i in 0..n_tasks {
530        let end = off + input_partitions - 1;
531        let partitions = if input_partitions == 1 {
532            format!("p{off}")
533        } else {
534            format!("p{off}..p{end}")
535        };
536        tasks.push(format!("t{i}:[{partitions}]"));
537        off += if hash_shuffle { 0 } else { input_partitions }
538    }
539    format!("Tasks: {}", tasks.join(" "))
540}
541
542// num_colors must agree with the colorscheme selected from
543// https://graphviz.org/doc/info/colors.html
544const NUM_COLORS: usize = 6;
545const COLOR_SCHEME: &str = "spectral6";
546
547/// This will render a regular or distributed datafusion plan as
548/// Graphviz dot format.
549/// You can view them on https://vis-js.com
550///
551/// Or it is often useful to experiment with plan output using
552/// https://datafusion-fiddle.vercel.app/
553pub fn display_plan_graphviz(plan: Arc<dyn ExecutionPlan>) -> Result<String> {
554    let mut f = String::new();
555
556    writeln!(
557        f,
558        "digraph G {{
559  rankdir=BT
560  edge[colorscheme={COLOR_SCHEME}, penwidth=2.0]
561  splines=false
562"
563    )?;
564
565    if plan.is::<DistributedExec>() {
566        let mut max_num = 0;
567        let mut all_stages = find_all_stages(&plan)
568            .into_iter()
569            .inspect(|v| max_num = max_num.max(v.num()))
570            .collect::<Vec<_>>();
571        let head_stage = Stage::Local(LocalStage {
572            query_id: Default::default(),
573            num: max_num + 1,
574            plan: plan.clone(),
575            tasks: 1,
576        });
577        all_stages.insert(0, &head_stage);
578
579        // draw all tasks first
580        for stage in &all_stages {
581            for i in 0..stage.task_count() {
582                let p = display_single_task(stage, i)?;
583                writeln!(f, "{p}")?;
584            }
585        }
586        // now draw edges between the tasks
587        for stage in &all_stages {
588            let Some(plan) = stage.local_plan() else {
589                continue;
590            };
591            for input_stage in find_input_stages(plan.as_ref()) {
592                for task_i in 0..stage.task_count() {
593                    for input_task_i in 0..input_stage.task_count() {
594                        let edges =
595                            display_inter_task_edges(stage, task_i, input_stage, input_task_i)?;
596                        writeln!(
597                            f,
598                            "// edges from child stage {} task {} to stage {} task {}\n {}",
599                            input_stage.num(),
600                            input_task_i,
601                            stage.num(),
602                            task_i,
603                            edges
604                        )?;
605                    }
606                }
607            }
608        }
609    } else {
610        // single plan, not a stage tree
611        writeln!(f, "node[shape=none]")?;
612        let p = display_plan(&plan, 0, 1, 0)?;
613        writeln!(f, "{p}")?;
614    }
615
616    writeln!(f, "}}")?;
617
618    Ok(f)
619}
620
621fn display_single_task(stage: &Stage, task_i: usize) -> Result<String> {
622    let Some(plan) = stage.local_plan() else {
623        return config_err!("plan not present");
624    };
625    let partition_group =
626        build_partition_group(task_i, plan.output_partitioning().partition_count());
627
628    let mut f = String::new();
629    writeln!(
630        f,
631        "
632  subgraph \"cluster_stage_{}_task_{}_margin\" {{
633    style=invis
634    margin=20.0
635  subgraph \"cluster_stage_{}_task_{}\" {{
636    color=blue
637    style=dotted
638    label = \"Stage {} Task {} Partitions {}\"
639    labeljust=r
640    labelloc=b
641
642    node[shape=none]
643
644",
645        stage.num(),
646        task_i,
647        stage.num(),
648        task_i,
649        stage.num(),
650        task_i,
651        format_pg(&partition_group)
652    )?;
653
654    writeln!(
655        f,
656        "{}",
657        display_plan(plan, task_i, stage.task_count(), stage.num())?
658    )?;
659    writeln!(f, "  }}")?;
660    writeln!(f, "  }}")?;
661
662    Ok(f)
663}
664
665fn display_plan(
666    plan: &Arc<dyn ExecutionPlan>,
667    task_i: usize,
668    _n_tasks: usize,
669    stage_num: usize,
670) -> Result<String> {
671    // draw all plans
672    // we need to label the nodes including depth to uniquely identify them within this task
673    // the tree node API provides depth first traversal, but we need breadth to align with
674    // how we will draw edges below, so we'll do that.
675    let mut queue = VecDeque::from([plan]);
676    let mut node_index = 0;
677
678    let mut f = String::new();
679    while let Some(plan) = queue.pop_front() {
680        node_index += 1;
681        let p = display_single_plan(plan.as_ref(), stage_num, task_i, node_index)?;
682        writeln!(f, "{p}")?;
683
684        if plan.is_network_boundary() {
685            continue;
686        }
687        for child in plan.children().iter() {
688            queue.push_back(child);
689        }
690    }
691
692    // draw edges between the plan nodes
693    type PlanWithParent<'a> = (
694        &'a Arc<dyn ExecutionPlan>,
695        Option<&'a Arc<dyn ExecutionPlan>>,
696        usize,
697    );
698    let mut queue: VecDeque<PlanWithParent> = VecDeque::from([(plan, None, 0usize)]);
699    node_index = 0;
700    while let Some((plan, maybe_parent, parent_idx)) = queue.pop_front() {
701        node_index += 1;
702        if let Some(parent) = maybe_parent {
703            let output_partitions = plan.output_partitioning().partition_count();
704
705            for i in 0..output_partitions {
706                let style = "";
707
708                writeln!(
709                    f,
710                    "  {}_{}_{}_{}:t{}:n -> {}_{}_{}_{}:b{}:s {}[color={}]",
711                    plan.name(),
712                    stage_num,
713                    task_i,
714                    node_index,
715                    i,
716                    parent.name(),
717                    stage_num,
718                    task_i,
719                    parent_idx,
720                    i,
721                    style,
722                    i % NUM_COLORS + 1
723                )?;
724            }
725        }
726
727        if plan.as_ref().is_network_boundary() {
728            continue;
729        }
730
731        for child in plan.children() {
732            queue.push_back((child, Some(plan), node_index));
733        }
734    }
735    Ok(f)
736}
737
738/// We want to display a single plan as a three row table with the top and bottom being
739/// graphvis ports.
740///
741/// We accept an index to make the node name unique in the graphviz output within
742/// a plan at the same depth
743///
744/// An example of such a node would be:
745///
746/// ```text
747///       NetworkShuffleExec [label=<
748///     <TABLE BORDER="0" CELLBORDER="0" CELLSPACING="0" CELLPADDING="0">
749///         <TR>
750///             <TD CELLBORDER="0">
751///                 <TABLE BORDER="0" CELLBORDER="1" CELLSPACING="0">
752///                     <TR>
753///                         <TD PORT="t1"></TD>
754///                         <TD PORT="t2"></TD>
755///                     </TR>
756///                 </TABLE>
757///             </TD>
758///         </TR>
759///         <TR>
760///             <TD BORDER="0" CELLPADDING="0" CELLSPACING="0">
761///                 <TABLE BORDER="0" CELLBORDER="1" CELLSPACING="0">
762///                     <TR>
763///                         <TD>NetworkShuffleExec</TD>
764///                     </TR>
765///                 </TABLE>
766///             </TD>
767///         </TR>
768///         <TR>
769///             <TD CELLBORDER="0">
770///                 <TABLE BORDER="0" CELLBORDER="1" CELLSPACING="0">
771///                     <TR>
772///                         <TD PORT="b1"></TD>
773///                         <TD PORT="b2"></TD>
774///                     </TR>
775///                 </TABLE>
776///             </TD>
777///         </TR>
778///     </TABLE>
779/// >];
780/// ```
781pub fn display_single_plan(
782    plan: &(dyn ExecutionPlan + 'static),
783    stage_num: usize,
784    task_i: usize,
785    node_index: usize,
786) -> Result<String> {
787    let mut f = String::new();
788    let output_partitions = plan.output_partitioning().partition_count();
789    let input_partitions = if plan.is_network_boundary() {
790        output_partitions
791    } else if let Some(child) = plan.children().first() {
792        child.output_partitioning().partition_count()
793    } else {
794        1
795    };
796
797    writeln!(
798        f,
799        "
800    {}_{}_{}_{} [label=<
801    <TABLE BORDER='0' CELLBORDER='0' CELLSPACING='0' CELLPADDING='0'>
802        <TR>
803            <TD CELLBORDER='0'>
804                <TABLE BORDER='0' CELLBORDER='1' CELLSPACING='0'>
805                    <TR>",
806        plan.name(),
807        stage_num,
808        task_i,
809        node_index
810    )?;
811
812    for i in 0..output_partitions {
813        writeln!(f, "                        <TD PORT='t{i}'></TD>")?;
814    }
815
816    writeln!(
817        f,
818        "                   </TR>
819                </TABLE>
820            </TD>
821        </TR>
822        <TR>
823            <TD BORDER='0' CELLPADDING='0' CELLSPACING='0'>
824                <TABLE BORDER='0' CELLBORDER='1' CELLSPACING='0'>
825                    <TR>
826                        <TD>{}</TD>
827                    </TR>
828                </TABLE>
829            </TD>
830        </TR>
831        <TR>
832            <TD CELLBORDER='0'>
833                <TABLE BORDER='0' CELLBORDER='1' CELLSPACING='0'>
834                    <TR>",
835        plan.name()
836    )?;
837
838    for i in 0..input_partitions {
839        writeln!(f, "                        <TD PORT='b{i}'></TD>")?;
840    }
841
842    writeln!(
843        f,
844        "                   </TR>
845                </TABLE>
846            </TD>
847        </TR>
848    </TABLE>
849  >];
850"
851    )?;
852    Ok(f)
853}
854
855fn display_inter_task_edges(
856    stage: &Stage,
857    task_i: usize,
858    input_stage: &Stage,
859    input_task_i: usize,
860) -> Result<String> {
861    let Some(plan) = stage.local_plan() else {
862        return plan_err!("The inner plan of a stage was encoded.");
863    };
864    let Some(input_plan) = input_stage.local_plan() else {
865        return plan_err!("The inner plan of a stage was encoded.");
866    };
867    let mut f = String::new();
868
869    let mut queue = VecDeque::from([plan]);
870    let mut index = 0;
871    while let Some(plan) = queue.pop_front() {
872        index += 1;
873        if let Some(node) = plan.downcast_ref::<NetworkShuffleExec>() {
874            if node.input_stage().num() != input_stage.num() {
875                continue;
876            }
877            // draw the edges to this node pulling data up from its child
878            let output_partitions = plan.output_partitioning().partition_count();
879            for p in 0..output_partitions {
880                writeln!(
881                    f,
882                    "  {}_{}_{}_{}:t{}:n -> {}_{}_{}_{}:b{}:s [color={}]",
883                    input_plan.name(),
884                    input_stage.num(),
885                    input_task_i,
886                    1, // the repartition exec is always the first node in the plan
887                    p + (task_i * output_partitions),
888                    plan.name(),
889                    stage.num(),
890                    task_i,
891                    index,
892                    p,
893                    p % NUM_COLORS + 1
894                )?;
895            }
896            continue;
897        } else if let Some(node) = plan.downcast_ref::<NetworkCoalesceExec>() {
898            if node.input_stage().num() != input_stage.num() {
899                continue;
900            }
901            // draw the edges to this node pulling data up from its child
902            let output_partitions = plan.output_partitioning().partition_count();
903            let input_partitions_per_task = output_partitions / input_stage.task_count();
904            for p in 0..input_partitions_per_task {
905                writeln!(
906                    f,
907                    "  {}_{}_{}_{}:t{}:n -> {}_{}_{}_{}:b{}:s [color={}]",
908                    input_plan.name(),
909                    input_stage.num(),
910                    input_task_i,
911                    1, // the repartition exec is always the first node in the plan
912                    p,
913                    plan.name(),
914                    stage.num(),
915                    task_i,
916                    index,
917                    p + (input_task_i * input_partitions_per_task),
918                    p % NUM_COLORS + 1
919                )?;
920            }
921            continue;
922        }
923
924        for child in plan.children() {
925            queue.push_back(child);
926        }
927    }
928
929    Ok(f)
930}
931
932fn format_pg(partition_group: &[usize]) -> String {
933    partition_group
934        .iter()
935        .map(|pg| format!("{pg}"))
936        .collect::<Vec<_>>()
937        .join("_")
938}
939
940fn build_partition_group(task_i: usize, partitions: usize) -> Vec<usize> {
941    ((task_i * partitions)..((task_i + 1) * partitions)).collect::<Vec<_>>()
942}
943
944fn find_input_stages(plan: &dyn ExecutionPlan) -> Vec<&Stage> {
945    let mut result = vec![];
946    for child in plan.children() {
947        if let Some(plan) = child.as_network_boundary() {
948            result.push(plan.input_stage());
949        } else {
950            result.extend(find_input_stages(child.as_ref()));
951        }
952    }
953    result
954}
955
956pub(crate) fn find_all_stages(plan: &Arc<dyn ExecutionPlan>) -> Vec<&Stage> {
957    let mut result = vec![];
958    if let Some(plan) = plan.as_network_boundary() {
959        result.push(plan.input_stage());
960    }
961    for child in plan.children() {
962        result.extend(find_all_stages(child));
963    }
964    result
965}