use crate::execution_plans::{DistributedExec, NetworkCoalesceExec};
use crate::metrics::DISTRIBUTED_DATAFUSION_TASK_ID_LABEL;
use crate::{NetworkShuffleExec, PartitionIsolatorExec};
use datafusion::common::plan_err;
use datafusion::common::{HashMap, config_err};
use datafusion::error::Result;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::metrics::{Label, Metric, MetricsSet};
use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties, displayable};
use itertools::Either;
use std::collections::VecDeque;
use std::sync::Arc;
use url::Url;
use uuid::Uuid;
#[derive(Debug, Clone)]
pub struct Stage {
pub(crate) query_id: Uuid,
pub(crate) num: usize,
pub(crate) plan: Option<Arc<dyn ExecutionPlan>>,
pub tasks: Vec<ExecutionTask>,
}
#[derive(Debug, Clone)]
pub struct ExecutionTask {
pub(crate) url: Option<Url>,
}
#[derive(Debug, Clone, PartialEq)]
pub struct DistributedTaskContext {
pub task_index: usize,
pub task_count: usize,
}
impl DistributedTaskContext {
pub fn from_ctx(ctx: &Arc<TaskContext>) -> Arc<Self> {
ctx.session_config()
.get_extension::<Self>()
.unwrap_or(Arc::new(DistributedTaskContext {
task_index: 0,
task_count: 1,
}))
}
}
impl Stage {
pub(crate) fn new(
query_id: Uuid,
num: usize,
plan: Arc<dyn ExecutionPlan>,
n_tasks: usize,
) -> Self {
Self {
query_id,
num,
plan: Some(plan),
tasks: vec![ExecutionTask { url: None }; n_tasks],
}
}
}
use crate::{DistributedMetricsFormat, rewrite_distributed_plan_with_metrics};
use crate::{NetworkBoundary, NetworkBoundaryExt};
use datafusion::common::DataFusionError;
use datafusion::physical_expr::Partitioning;
use std::fmt::Write;
pub fn explain_analyze(
executed: Arc<dyn ExecutionPlan>,
format: DistributedMetricsFormat,
) -> Result<String, DataFusionError> {
match executed.as_any().downcast_ref::<DistributedExec>() {
None => Ok(DisplayableExecutionPlan::with_metrics(executed.as_ref())
.indent(true)
.to_string()),
Some(_) => {
let executed = rewrite_distributed_plan_with_metrics(executed.clone(), format)?;
Ok(display_plan_ascii(executed.as_ref(), true))
}
}
}
const LTCORNER: &str = "┌"; const LDCORNER: &str = "└"; const VERTICAL: &str = "│"; const HORIZONTAL: &str = "─"; pub fn display_plan_ascii(plan: &dyn ExecutionPlan, show_metrics: bool) -> String {
if let Some(plan) = plan.as_any().downcast_ref::<DistributedExec>() {
let mut f = String::new();
display_ascii(Either::Left(plan), 0, show_metrics, &mut f).unwrap();
f
} else {
match show_metrics {
true => DisplayableExecutionPlan::with_metrics(plan)
.indent(true)
.to_string(),
false => displayable(plan).indent(true).to_string(),
}
}
}
fn display_ascii(
stage: Either<&DistributedExec, &Stage>,
depth: usize,
show_metrics: bool,
f: &mut String,
) -> std::fmt::Result {
let plan = match stage {
Either::Left(distributed_exec) => distributed_exec.children().first().unwrap(),
Either::Right(stage) => {
let Some(plan) = &stage.plan else {
return write!(f, "StageExec: encoded input plan");
};
plan
}
};
match stage {
Either::Left(dist_exec) => {
writeln!(
f,
"{}{}{} DistributedExec {} {}{}",
" ".repeat(depth),
LTCORNER,
HORIZONTAL.repeat(5),
HORIZONTAL.repeat(2),
format_tasks_for_stage(1, plan),
if show_metrics {
format_metrics_by_task(&dist_exec.metrics().unwrap_or_default())
} else {
"".into()
}
)?;
}
Either::Right(stage) => {
writeln!(
f,
"{}{}{} Stage {} {} {}",
" ".repeat(depth),
LTCORNER,
HORIZONTAL.repeat(5),
stage.num,
HORIZONTAL.repeat(2),
format_tasks_for_stage(stage.tasks.len(), plan)
)?;
}
}
let mut plan_str = String::new();
display_inner_ascii(plan, 0, show_metrics, &mut plan_str)?;
let plan_str = plan_str
.split('\n')
.filter(|v| !v.is_empty())
.collect::<Vec<_>>()
.join(&format!("\n{}{}", " ".repeat(depth), VERTICAL));
writeln!(f, "{}{}{}", " ".repeat(depth), VERTICAL, plan_str)?;
writeln!(
f,
"{}{}{}",
" ".repeat(depth),
LDCORNER,
HORIZONTAL.repeat(50)
)?;
for input_stage in find_input_stages(plan.as_ref()) {
display_ascii(Either::Right(input_stage), depth + 1, show_metrics, f)?;
}
Ok(())
}
fn display_inner_ascii(
plan: &Arc<dyn ExecutionPlan>,
indent: usize,
show_metrics: bool,
f: &mut String,
) -> std::fmt::Result {
let metrics_str = if show_metrics {
if let Some(metrics) = plan.metrics() {
let formatted = format_metrics_by_task(&metrics);
if formatted.is_empty() {
", metrics=[]".to_string()
} else {
format!(", metrics=[{formatted}]")
}
} else {
", metrics=[]".to_string()
}
} else {
String::new()
};
let node_str = displayable(plan.as_ref()).one_line().to_string();
writeln!(
f,
"{} {}{metrics_str}",
" ".repeat(indent),
node_str.trim_end() )?;
if plan.is_network_boundary() {
return Ok(());
}
for child in plan.children() {
display_inner_ascii(child, indent + 2, show_metrics, f)?;
}
Ok(())
}
fn aggregate_by_task_id(metrics: &MetricsSet) -> MetricsSet {
let mut map: HashMap<(String, Option<String>), Metric> = HashMap::new();
for metric in metrics.iter() {
let name = metric.value().name().to_string();
let task_id = metric
.labels()
.iter()
.find(|l| l.name() == DISTRIBUTED_DATAFUSION_TASK_ID_LABEL)
.map(|l| l.value().to_string());
let key = (name, task_id.clone());
map.entry(key)
.and_modify(|accum| {
accum.value_mut().aggregate(metric.value());
})
.or_insert_with(|| {
let labels = task_id
.map(|id| vec![Label::new(DISTRIBUTED_DATAFUSION_TASK_ID_LABEL, id)])
.unwrap_or_default();
let mut accum = Metric::new_with_labels(
metric.value().new_empty(),
None, labels,
);
accum.value_mut().aggregate(metric.value());
accum
});
}
let mut result = MetricsSet::new();
for (_, metric) in map {
result.push(Arc::new(metric));
}
result
}
fn sorted_for_display_by_task_id(metrics: MetricsSet) -> MetricsSet {
let mut vec: Vec<Arc<Metric>> = metrics.iter().cloned().collect();
vec.sort_unstable_by_key(|metric| {
let task_id = metric
.labels()
.iter()
.find(|l| l.name() == DISTRIBUTED_DATAFUSION_TASK_ID_LABEL)
.and_then(|l| l.value().parse::<u64>().ok());
(
metric.value().display_sort_key(),
metric.value().name().to_owned(),
task_id,
)
});
let mut result = MetricsSet::new();
for m in vec {
result.push(m);
}
result
}
fn format_metrics_by_task(metrics: &MetricsSet) -> String {
let aggregated = aggregate_by_task_id(metrics);
let sorted = sorted_for_display_by_task_id(aggregated).timestamps_removed();
sorted
.iter()
.map(|m| {
let name = m.value().name();
let task_id = m
.labels()
.iter()
.find(|l| l.name() == DISTRIBUTED_DATAFUSION_TASK_ID_LABEL)
.map(|l| l.value());
match task_id {
Some(id) => format!("{name}_{id}={}", m.value()),
None => format!("{name}={}", m.value()),
}
})
.collect::<Vec<_>>()
.join(", ")
}
fn format_tasks_for_stage(n_tasks: usize, head: &Arc<dyn ExecutionPlan>) -> String {
let partitioning = head.properties().output_partitioning();
let input_partitions = partitioning.partition_count();
let hash_shuffle = matches!(partitioning, Partitioning::Hash(_, _));
let mut result = "Tasks: ".to_string();
let mut off = 0;
for i in 0..n_tasks {
result += &format!("t{i}:[");
let end = off + input_partitions - 1;
if input_partitions == 1 {
result += &format!("p{off}");
} else {
result += &format!("p{off}..p{end}");
}
result += "] ";
off += if hash_shuffle { 0 } else { input_partitions }
}
result
}
const NUM_COLORS: usize = 6;
const COLOR_SCHEME: &str = "spectral6";
pub fn display_plan_graphviz(plan: Arc<dyn ExecutionPlan>) -> Result<String> {
let mut f = String::new();
writeln!(
f,
"digraph G {{
rankdir=BT
edge[colorscheme={COLOR_SCHEME}, penwidth=2.0]
splines=false
"
)?;
if plan.as_any().is::<DistributedExec>() {
let mut max_num = 0;
let mut all_stages = find_all_stages(&plan)
.into_iter()
.inspect(|v| max_num = max_num.max(v.num))
.collect::<Vec<_>>();
let head_stage = Stage {
query_id: Default::default(),
num: max_num + 1,
plan: Some(plan.clone()),
tasks: vec![ExecutionTask { url: None }],
};
all_stages.insert(0, &head_stage);
for stage in &all_stages {
for i in 0..stage.tasks.iter().len() {
let p = display_single_task(stage, i)?;
writeln!(f, "{p}")?;
}
}
for stage in &all_stages {
let Some(plan) = &stage.plan else { continue };
for input_stage in find_input_stages(plan.as_ref()) {
for task_i in 0..stage.tasks.len() {
for input_task_i in 0..input_stage.tasks.len() {
let edges =
display_inter_task_edges(stage, task_i, input_stage, input_task_i)?;
writeln!(
f,
"// edges from child stage {} task {} to stage {} task {}\n {}",
input_stage.num, input_task_i, stage.num, task_i, edges
)?;
}
}
}
}
} else {
writeln!(f, "node[shape=none]")?;
let p = display_plan(&plan, 0, 1, 0)?;
writeln!(f, "{p}")?;
}
writeln!(f, "}}")?;
Ok(f)
}
fn display_single_task(stage: &Stage, task_i: usize) -> Result<String> {
let Some(plan) = &stage.plan else {
return config_err!("plan not present");
};
let partition_group =
build_partition_group(task_i, plan.output_partitioning().partition_count());
let mut f = String::new();
writeln!(
f,
"
subgraph \"cluster_stage_{}_task_{}_margin\" {{
style=invis
margin=20.0
subgraph \"cluster_stage_{}_task_{}\" {{
color=blue
style=dotted
label = \"Stage {} Task {} Partitions {}\"
labeljust=r
labelloc=b
node[shape=none]
",
stage.num,
task_i,
stage.num,
task_i,
stage.num,
task_i,
format_pg(&partition_group)
)?;
writeln!(
f,
"{}",
display_plan(plan, task_i, stage.tasks.len(), stage.num)?
)?;
writeln!(f, " }}")?;
writeln!(f, " }}")?;
Ok(f)
}
fn display_plan(
plan: &Arc<dyn ExecutionPlan>,
task_i: usize,
n_tasks: usize,
stage_num: usize,
) -> Result<String> {
let mut queue = VecDeque::from([plan]);
let mut node_index = 0;
let mut f = String::new();
while let Some(plan) = queue.pop_front() {
node_index += 1;
let p = display_single_plan(plan.as_ref(), stage_num, task_i, node_index)?;
writeln!(f, "{p}")?;
if plan.is_network_boundary() {
continue;
}
for child in plan.children().iter() {
queue.push_back(child);
}
}
type PlanWithParent<'a> = (
&'a Arc<dyn ExecutionPlan>,
Option<&'a Arc<dyn ExecutionPlan>>,
usize,
);
let mut queue: VecDeque<PlanWithParent> = VecDeque::from([(plan, None, 0usize)]);
let mut isolator_partition_group = None;
node_index = 0;
while let Some((plan, maybe_parent, parent_idx)) = queue.pop_front() {
node_index += 1;
if let Some(node) = plan.as_any().downcast_ref::<PartitionIsolatorExec>() {
isolator_partition_group = Some(PartitionIsolatorExec::partition_group(
node.input.output_partitioning().partition_count(),
task_i,
n_tasks,
));
}
if let Some(parent) = maybe_parent {
let output_partitions = plan.output_partitioning().partition_count();
for i in 0..output_partitions {
let mut style = "";
if plan.as_any().is::<PartitionIsolatorExec>() {
if i >= isolator_partition_group.as_ref().map_or(0, |v| v.len()) {
style = "[style=dotted, label=empty]";
}
} else if let Some(partition_group) = &isolator_partition_group
&& !partition_group.contains(&i)
{
style = "[style=invis]";
}
writeln!(
f,
" {}_{}_{}_{}:t{}:n -> {}_{}_{}_{}:b{}:s {}[color={}]",
plan.name(),
stage_num,
task_i,
node_index,
i,
parent.name(),
stage_num,
task_i,
parent_idx,
i,
style,
i % NUM_COLORS + 1
)?;
}
}
if plan.as_ref().is_network_boundary() {
continue;
}
for child in plan.children() {
queue.push_back((child, Some(plan), node_index));
}
}
Ok(f)
}
pub fn display_single_plan(
plan: &(dyn ExecutionPlan + 'static),
stage_num: usize,
task_i: usize,
node_index: usize,
) -> Result<String> {
let mut f = String::new();
let output_partitions = plan.output_partitioning().partition_count();
let input_partitions = if plan.is_network_boundary() {
output_partitions
} else if let Some(child) = plan.children().first() {
child.output_partitioning().partition_count()
} else {
1
};
writeln!(
f,
"
{}_{}_{}_{} [label=<
<TABLE BORDER='0' CELLBORDER='0' CELLSPACING='0' CELLPADDING='0'>
<TR>
<TD CELLBORDER='0'>
<TABLE BORDER='0' CELLBORDER='1' CELLSPACING='0'>
<TR>",
plan.name(),
stage_num,
task_i,
node_index
)?;
for i in 0..output_partitions {
writeln!(f, " <TD PORT='t{i}'></TD>")?;
}
writeln!(
f,
" </TR>
</TABLE>
</TD>
</TR>
<TR>
<TD BORDER='0' CELLPADDING='0' CELLSPACING='0'>
<TABLE BORDER='0' CELLBORDER='1' CELLSPACING='0'>
<TR>
<TD>{}</TD>
</TR>
</TABLE>
</TD>
</TR>
<TR>
<TD CELLBORDER='0'>
<TABLE BORDER='0' CELLBORDER='1' CELLSPACING='0'>
<TR>",
plan.name()
)?;
for i in 0..input_partitions {
writeln!(f, " <TD PORT='b{i}'></TD>")?;
}
writeln!(
f,
" </TR>
</TABLE>
</TD>
</TR>
</TABLE>
>];
"
)?;
Ok(f)
}
fn display_inter_task_edges(
stage: &Stage,
task_i: usize,
input_stage: &Stage,
input_task_i: usize,
) -> Result<String> {
let Some(plan) = &stage.plan else {
return plan_err!("The inner plan of a stage was encoded.");
};
let Some(input_plan) = &input_stage.plan else {
return plan_err!("The inner plan of a stage was encoded.");
};
let mut f = String::new();
let mut queue = VecDeque::from([plan]);
let mut index = 0;
while let Some(plan) = queue.pop_front() {
index += 1;
if let Some(node) = plan.as_any().downcast_ref::<NetworkShuffleExec>() {
if node.input_stage().num != input_stage.num {
continue;
}
let output_partitions = plan.output_partitioning().partition_count();
for p in 0..output_partitions {
writeln!(
f,
" {}_{}_{}_{}:t{}:n -> {}_{}_{}_{}:b{}:s [color={}]",
input_plan.name(),
input_stage.num,
input_task_i,
1, p + (task_i * output_partitions),
plan.name(),
stage.num,
task_i,
index,
p,
p % NUM_COLORS + 1
)?;
}
continue;
} else if let Some(node) = plan.as_any().downcast_ref::<NetworkCoalesceExec>() {
if node.input_stage().num != input_stage.num {
continue;
}
let output_partitions = plan.output_partitioning().partition_count();
let input_partitions_per_task = output_partitions / input_stage.tasks.len();
for p in 0..input_partitions_per_task {
writeln!(
f,
" {}_{}_{}_{}:t{}:n -> {}_{}_{}_{}:b{}:s [color={}]",
input_plan.name(),
input_stage.num,
input_task_i,
1, p,
plan.name(),
stage.num,
task_i,
index,
p + (input_task_i * input_partitions_per_task),
p % NUM_COLORS + 1
)?;
}
continue;
}
for child in plan.children() {
queue.push_back(child);
}
}
Ok(f)
}
fn format_pg(partition_group: &[usize]) -> String {
partition_group
.iter()
.map(|pg| format!("{pg}"))
.collect::<Vec<_>>()
.join("_")
}
fn build_partition_group(task_i: usize, partitions: usize) -> Vec<usize> {
((task_i * partitions)..((task_i + 1) * partitions)).collect::<Vec<_>>()
}
fn find_input_stages(plan: &dyn ExecutionPlan) -> Vec<&Stage> {
let mut result = vec![];
for child in plan.children() {
if let Some(plan) = child.as_network_boundary() {
result.push(plan.input_stage());
} else {
result.extend(find_input_stages(child.as_ref()));
}
}
result
}
pub(crate) fn find_all_stages(plan: &Arc<dyn ExecutionPlan>) -> Vec<&Stage> {
let mut result = vec![];
if let Some(plan) = plan.as_network_boundary() {
result.push(plan.input_stage());
}
for child in plan.children() {
result.extend(find_all_stages(child));
}
result
}