use crate::coordinator::{DistributedExec, MetricsStore};
use crate::execution_plans::{DistributedLeafExec, NetworkCoalesceExec};
use crate::metrics::DISTRIBUTED_DATAFUSION_TASK_ID_LABEL;
use datafusion::common::{HashMap, config_err};
use datafusion::common::{exec_err, plan_err};
use datafusion::error::Result;
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::metrics::{Label, Metric, MetricsSet};
use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties, displayable};
use itertools::Either;
use std::collections::VecDeque;
use std::sync::Arc;
use url::Url;
use uuid::Uuid;
#[derive(Debug, Clone)]
pub enum Stage {
Local(LocalStage),
Remote(RemoteStage),
}
#[derive(Debug, Clone)]
pub struct LocalStage {
pub query_id: Uuid,
pub num: usize,
pub plan: Arc<dyn ExecutionPlan>,
pub tasks: usize,
}
impl LocalStage {
pub fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
if self.tasks > 1 {
return exec_err!("Cannot execute a local stage with more than 1 task");
}
self.plan.execute(partition, context)
}
}
#[derive(Debug, Clone)]
pub struct RemoteStage {
pub query_id: Uuid,
pub num: usize,
pub workers: Vec<Url>,
}
impl Stage {
pub fn query_id(&self) -> Uuid {
match &self {
Self::Local(v) => v.query_id,
Self::Remote(v) => v.query_id,
}
}
pub fn num(&self) -> usize {
match &self {
Self::Local(v) => v.num,
Self::Remote(v) => v.num,
}
}
pub fn task_count(&self) -> usize {
match &self {
Self::Local(v) => v.tasks,
Self::Remote(v) => v.workers.len(),
}
}
pub fn local_plan(&self) -> Option<&Arc<dyn ExecutionPlan>> {
match &self {
Self::Local(v) => Some(&v.plan),
Self::Remote(_) => None,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct DistributedTaskContext {
pub task_index: usize,
pub task_count: usize,
}
impl DistributedTaskContext {
pub fn from_ctx(ctx: &Arc<TaskContext>) -> Arc<Self> {
ctx.session_config()
.get_extension::<Self>()
.unwrap_or(Arc::new(DistributedTaskContext {
task_index: 0,
task_count: 1,
}))
}
}
use crate::common::serialize_uuid;
use crate::metrics::proto::metric_proto_to_df;
use crate::worker::generated::worker as pb;
use crate::{DistributedMetricsFormat, NetworkShuffleExec, rewrite_distributed_plan_with_metrics};
use crate::{NetworkBoundary, NetworkBoundaryExt};
use datafusion::common::DataFusionError;
use datafusion::physical_expr::Partitioning;
use std::fmt::Write;
pub async fn explain_analyze(
executed: Arc<dyn ExecutionPlan>,
format: DistributedMetricsFormat,
) -> Result<String, DataFusionError> {
match executed.downcast_ref::<DistributedExec>() {
None => Ok(DisplayableExecutionPlan::with_metrics(executed.as_ref())
.indent(true)
.to_string()),
Some(_) => {
let executed = rewrite_distributed_plan_with_metrics(executed.clone(), format).await?;
Ok(display_plan_ascii(executed.as_ref(), true))
}
}
}
const LTCORNER: &str = "┌"; const LDCORNER: &str = "└"; const VERTICAL: &str = "│"; const HORIZONTAL: &str = "─"; pub fn display_plan_ascii(plan: &dyn ExecutionPlan, show_metrics: bool) -> String {
if let Some(plan) = plan.downcast_ref::<DistributedExec>() {
let mut f = String::new();
display_ascii(plan, Either::Left(plan), 0, show_metrics, &mut f).unwrap();
f
} else {
match show_metrics {
true => DisplayableExecutionPlan::with_metrics(plan)
.indent(true)
.to_string(),
false => displayable(plan).indent(true).to_string(),
}
}
}
fn display_ascii(
root: &DistributedExec,
stage: Either<&DistributedExec, &Stage>,
depth: usize,
show_metrics: bool,
f: &mut String,
) -> std::fmt::Result {
let plan = match stage {
Either::Left(distributed_exec) => distributed_exec.children().first().unwrap(),
Either::Right(stage) => {
let Some(plan) = stage.local_plan() else {
return write!(f, "StageExec: encoded input plan");
};
plan
}
};
match stage {
Either::Left(dist_exec) => {
write!(
f,
"{}{}{} DistributedExec {} {}",
" ".repeat(depth),
LTCORNER,
HORIZONTAL.repeat(5),
HORIZONTAL.repeat(2),
format_tasks_for_stage(1, plan),
)?;
if show_metrics && let Some(metrics) = dist_exec.metrics() {
write!(f, " ")?;
writeln!(f, "{}", format_metrics_by_task(&metrics))?;
} else {
writeln!(f)?;
}
}
Either::Right(stage) => {
write!(
f,
"{}{}{} Stage {} {} {}",
" ".repeat(depth),
LTCORNER,
HORIZONTAL.repeat(5),
stage.num(),
HORIZONTAL.repeat(2),
format_tasks_for_stage(stage.task_count(), plan)
)?;
if show_metrics && let Some(metrics_store) = &root.metrics_store {
let metrics = gather_stage_header_metrics(stage, metrics_store);
write!(f, " ")?;
writeln!(f, "{}", format_metrics_by_task(&metrics))?;
} else {
writeln!(f)?;
}
}
}
let mut plan_str = String::new();
display_inner_ascii(plan, 0, show_metrics, &mut plan_str)?;
let plan_str = plan_str
.split('\n')
.filter(|v| !v.is_empty())
.collect::<Vec<_>>()
.join(&format!("\n{}{}", " ".repeat(depth), VERTICAL));
writeln!(f, "{}{}{}", " ".repeat(depth), VERTICAL, plan_str)?;
writeln!(
f,
"{}{}{}",
" ".repeat(depth),
LDCORNER,
HORIZONTAL.repeat(50)
)?;
for input_stage in find_input_stages(plan.as_ref()) {
display_ascii(root, Either::Right(input_stage), depth + 1, show_metrics, f)?;
}
Ok(())
}
fn display_inner_ascii(
plan: &Arc<dyn ExecutionPlan>,
indent: usize,
show_metrics: bool,
f: &mut String,
) -> std::fmt::Result {
if plan.is::<DistributedLeafExec>() {
return display_inner_distributed_leaf(plan, indent, show_metrics, f);
}
let node_str = displayable(plan.as_ref()).one_line().to_string();
let metrics_str = match show_metrics {
true => metrics_suffix(plan.metrics().map(|m| format_metrics_by_task(&m))),
false => String::new(),
};
writeln!(
f,
"{} {}{metrics_str}",
" ".repeat(indent),
node_str.trim_end() )?;
if plan.is_network_boundary() {
return Ok(());
}
for child in plan.children() {
display_inner_ascii(child, indent + 2, show_metrics, f)?;
}
Ok(())
}
fn display_inner_distributed_leaf(
plan: &Arc<dyn ExecutionPlan>,
indent: usize,
show_metrics: bool,
f: &mut String,
) -> std::fmt::Result {
let Some(leaf) = plan.downcast_ref::<DistributedLeafExec>() else {
return Ok(());
};
let indent = " ".repeat(indent);
if let Some(by_task) = show_metrics
.then(|| plan.metrics())
.flatten()
.map(|m| metrics_by_task_id(&m))
&& !by_task.is_empty()
{
writeln!(f, "{indent} DistributedLeafExec:")?;
for (task_i, variant) in leaf.variants.iter().enumerate() {
let variant = displayable(variant.as_ref()).one_line().to_string();
let metrics = match by_task.is_empty() {
true => String::new(),
false => metrics_suffix(by_task.get(&task_i).map(format_metrics_by_task)),
};
writeln!(f, "{indent} t{task_i}: {}{metrics}", variant.trim_end())?;
}
} else {
let header = match show_metrics {
true => metrics_suffix(plan.metrics().map(|m| format_metrics_by_task(&m))),
false => String::new(),
};
writeln!(f, "{indent} DistributedLeafExec:{header}")?;
for (task_i, variant) in leaf.variants.iter().enumerate() {
let variant = displayable(variant.as_ref()).one_line().to_string();
writeln!(f, "{indent} t{task_i}: {}", variant.trim_end())?;
}
}
Ok(())
}
fn gather_stage_header_metrics(stage: &Stage, metrics_store: &MetricsStore) -> MetricsSet {
let mut task_key = pb::TaskKey {
query_id: serialize_uuid(&stage.query_id()),
stage_id: stage.num() as u64,
task_number: 0,
};
let mut all_metrics = MetricsSet::new();
while let Some(metrics_set) = metrics_store.get(&task_key).and_then(|v| v.task_metrics) {
for mut metric in metrics_set.metrics {
metric.labels.push(pb::Label {
name: DISTRIBUTED_DATAFUSION_TASK_ID_LABEL.to_string(),
value: task_key.task_number.to_string(),
});
if let Ok(metric) = metric_proto_to_df(metric) {
all_metrics.push(metric)
};
}
task_key.task_number += 1;
}
all_metrics
}
fn aggregate_by_task_id(metrics: &MetricsSet) -> MetricsSet {
let mut map: HashMap<(String, Option<String>), Metric> = HashMap::new();
for metric in metrics.iter() {
let name = metric.value().name().to_string();
let task_id = metric
.labels()
.iter()
.find(|l| l.name() == DISTRIBUTED_DATAFUSION_TASK_ID_LABEL)
.map(|l| l.value().to_string());
let key = (name, task_id.clone());
map.entry(key)
.and_modify(|accum| {
accum.value_mut().aggregate(metric.value());
})
.or_insert_with(|| {
let labels = task_id
.map(|id| vec![Label::new(DISTRIBUTED_DATAFUSION_TASK_ID_LABEL, id)])
.unwrap_or_default();
let mut accum = Metric::new_with_labels(
metric.value().new_empty(),
None, labels,
);
accum.value_mut().aggregate(metric.value());
accum
});
}
let mut result = MetricsSet::new();
for (_, metric) in map {
result.push(Arc::new(metric));
}
result
}
fn sorted_for_display_by_task_id(metrics: MetricsSet) -> MetricsSet {
let mut vec: Vec<Arc<Metric>> = metrics.iter().cloned().collect();
vec.sort_unstable_by_key(|metric| {
let task_id = metric
.labels()
.iter()
.find(|l| l.name() == DISTRIBUTED_DATAFUSION_TASK_ID_LABEL)
.and_then(|l| l.value().parse::<u64>().ok());
(
metric.value().display_sort_key(),
metric.value().name().to_owned(),
task_id,
)
});
let mut result = MetricsSet::new();
for m in vec {
result.push(m);
}
result
}
fn format_metrics_by_task(metrics: &MetricsSet) -> String {
let aggregated = aggregate_by_task_id(metrics);
let sorted = sorted_for_display_by_task_id(aggregated).timestamps_removed();
sorted
.iter()
.map(|m| {
let name = m.value().name();
let task_id = m
.labels()
.iter()
.find(|l| l.name() == DISTRIBUTED_DATAFUSION_TASK_ID_LABEL)
.map(|l| l.value());
match task_id {
Some(id) => format!("{name}_{id}={}", m.value()),
None => format!("{name}={}", m.value()),
}
})
.collect::<Vec<_>>()
.join(", ")
}
fn metrics_suffix(formatted: Option<String>) -> String {
match formatted.unwrap_or_default() {
s if s.is_empty() => ", metrics=[]".to_string(),
s => format!(", metrics=[{s}]"),
}
}
fn metrics_by_task_id(metrics: &MetricsSet) -> HashMap<usize, MetricsSet> {
let mut map: HashMap<usize, MetricsSet> = HashMap::new();
for metric in metrics.iter() {
let Some(task_id) = metric
.labels()
.iter()
.find(|l| l.name() == DISTRIBUTED_DATAFUSION_TASK_ID_LABEL)
.and_then(|l| l.value().parse::<usize>().ok())
else {
continue;
};
map.entry(task_id).or_default().push(Arc::clone(metric));
}
map
}
fn format_tasks_for_stage(n_tasks: usize, head: &Arc<dyn ExecutionPlan>) -> String {
let partitioning = head.properties().output_partitioning();
let input_partitions = partitioning.partition_count();
let hash_shuffle = matches!(partitioning, Partitioning::Hash(_, _));
let mut tasks = Vec::with_capacity(n_tasks);
let mut off = 0;
for i in 0..n_tasks {
let end = off + input_partitions - 1;
let partitions = if input_partitions == 1 {
format!("p{off}")
} else {
format!("p{off}..p{end}")
};
tasks.push(format!("t{i}:[{partitions}]"));
off += if hash_shuffle { 0 } else { input_partitions }
}
format!("Tasks: {}", tasks.join(" "))
}
const NUM_COLORS: usize = 6;
const COLOR_SCHEME: &str = "spectral6";
pub fn display_plan_graphviz(plan: Arc<dyn ExecutionPlan>) -> Result<String> {
let mut f = String::new();
writeln!(
f,
"digraph G {{
rankdir=BT
edge[colorscheme={COLOR_SCHEME}, penwidth=2.0]
splines=false
"
)?;
if plan.is::<DistributedExec>() {
let mut max_num = 0;
let mut all_stages = find_all_stages(&plan)
.into_iter()
.inspect(|v| max_num = max_num.max(v.num()))
.collect::<Vec<_>>();
let head_stage = Stage::Local(LocalStage {
query_id: Default::default(),
num: max_num + 1,
plan: plan.clone(),
tasks: 1,
});
all_stages.insert(0, &head_stage);
for stage in &all_stages {
for i in 0..stage.task_count() {
let p = display_single_task(stage, i)?;
writeln!(f, "{p}")?;
}
}
for stage in &all_stages {
let Some(plan) = stage.local_plan() else {
continue;
};
for input_stage in find_input_stages(plan.as_ref()) {
for task_i in 0..stage.task_count() {
for input_task_i in 0..input_stage.task_count() {
let edges =
display_inter_task_edges(stage, task_i, input_stage, input_task_i)?;
writeln!(
f,
"// edges from child stage {} task {} to stage {} task {}\n {}",
input_stage.num(),
input_task_i,
stage.num(),
task_i,
edges
)?;
}
}
}
}
} else {
writeln!(f, "node[shape=none]")?;
let p = display_plan(&plan, 0, 1, 0)?;
writeln!(f, "{p}")?;
}
writeln!(f, "}}")?;
Ok(f)
}
fn display_single_task(stage: &Stage, task_i: usize) -> Result<String> {
let Some(plan) = stage.local_plan() else {
return config_err!("plan not present");
};
let partition_group =
build_partition_group(task_i, plan.output_partitioning().partition_count());
let mut f = String::new();
writeln!(
f,
"
subgraph \"cluster_stage_{}_task_{}_margin\" {{
style=invis
margin=20.0
subgraph \"cluster_stage_{}_task_{}\" {{
color=blue
style=dotted
label = \"Stage {} Task {} Partitions {}\"
labeljust=r
labelloc=b
node[shape=none]
",
stage.num(),
task_i,
stage.num(),
task_i,
stage.num(),
task_i,
format_pg(&partition_group)
)?;
writeln!(
f,
"{}",
display_plan(plan, task_i, stage.task_count(), stage.num())?
)?;
writeln!(f, " }}")?;
writeln!(f, " }}")?;
Ok(f)
}
fn display_plan(
plan: &Arc<dyn ExecutionPlan>,
task_i: usize,
_n_tasks: usize,
stage_num: usize,
) -> Result<String> {
let mut queue = VecDeque::from([plan]);
let mut node_index = 0;
let mut f = String::new();
while let Some(plan) = queue.pop_front() {
node_index += 1;
let p = display_single_plan(plan.as_ref(), stage_num, task_i, node_index)?;
writeln!(f, "{p}")?;
if plan.is_network_boundary() {
continue;
}
for child in plan.children().iter() {
queue.push_back(child);
}
}
type PlanWithParent<'a> = (
&'a Arc<dyn ExecutionPlan>,
Option<&'a Arc<dyn ExecutionPlan>>,
usize,
);
let mut queue: VecDeque<PlanWithParent> = VecDeque::from([(plan, None, 0usize)]);
node_index = 0;
while let Some((plan, maybe_parent, parent_idx)) = queue.pop_front() {
node_index += 1;
if let Some(parent) = maybe_parent {
let output_partitions = plan.output_partitioning().partition_count();
for i in 0..output_partitions {
let style = "";
writeln!(
f,
" {}_{}_{}_{}:t{}:n -> {}_{}_{}_{}:b{}:s {}[color={}]",
plan.name(),
stage_num,
task_i,
node_index,
i,
parent.name(),
stage_num,
task_i,
parent_idx,
i,
style,
i % NUM_COLORS + 1
)?;
}
}
if plan.as_ref().is_network_boundary() {
continue;
}
for child in plan.children() {
queue.push_back((child, Some(plan), node_index));
}
}
Ok(f)
}
pub fn display_single_plan(
plan: &(dyn ExecutionPlan + 'static),
stage_num: usize,
task_i: usize,
node_index: usize,
) -> Result<String> {
let mut f = String::new();
let output_partitions = plan.output_partitioning().partition_count();
let input_partitions = if plan.is_network_boundary() {
output_partitions
} else if let Some(child) = plan.children().first() {
child.output_partitioning().partition_count()
} else {
1
};
writeln!(
f,
"
{}_{}_{}_{} [label=<
<TABLE BORDER='0' CELLBORDER='0' CELLSPACING='0' CELLPADDING='0'>
<TR>
<TD CELLBORDER='0'>
<TABLE BORDER='0' CELLBORDER='1' CELLSPACING='0'>
<TR>",
plan.name(),
stage_num,
task_i,
node_index
)?;
for i in 0..output_partitions {
writeln!(f, " <TD PORT='t{i}'></TD>")?;
}
writeln!(
f,
" </TR>
</TABLE>
</TD>
</TR>
<TR>
<TD BORDER='0' CELLPADDING='0' CELLSPACING='0'>
<TABLE BORDER='0' CELLBORDER='1' CELLSPACING='0'>
<TR>
<TD>{}</TD>
</TR>
</TABLE>
</TD>
</TR>
<TR>
<TD CELLBORDER='0'>
<TABLE BORDER='0' CELLBORDER='1' CELLSPACING='0'>
<TR>",
plan.name()
)?;
for i in 0..input_partitions {
writeln!(f, " <TD PORT='b{i}'></TD>")?;
}
writeln!(
f,
" </TR>
</TABLE>
</TD>
</TR>
</TABLE>
>];
"
)?;
Ok(f)
}
fn display_inter_task_edges(
stage: &Stage,
task_i: usize,
input_stage: &Stage,
input_task_i: usize,
) -> Result<String> {
let Some(plan) = stage.local_plan() else {
return plan_err!("The inner plan of a stage was encoded.");
};
let Some(input_plan) = input_stage.local_plan() else {
return plan_err!("The inner plan of a stage was encoded.");
};
let mut f = String::new();
let mut queue = VecDeque::from([plan]);
let mut index = 0;
while let Some(plan) = queue.pop_front() {
index += 1;
if let Some(node) = plan.downcast_ref::<NetworkShuffleExec>() {
if node.input_stage().num() != input_stage.num() {
continue;
}
let output_partitions = plan.output_partitioning().partition_count();
for p in 0..output_partitions {
writeln!(
f,
" {}_{}_{}_{}:t{}:n -> {}_{}_{}_{}:b{}:s [color={}]",
input_plan.name(),
input_stage.num(),
input_task_i,
1, p + (task_i * output_partitions),
plan.name(),
stage.num(),
task_i,
index,
p,
p % NUM_COLORS + 1
)?;
}
continue;
} else if let Some(node) = plan.downcast_ref::<NetworkCoalesceExec>() {
if node.input_stage().num() != input_stage.num() {
continue;
}
let output_partitions = plan.output_partitioning().partition_count();
let input_partitions_per_task = output_partitions / input_stage.task_count();
for p in 0..input_partitions_per_task {
writeln!(
f,
" {}_{}_{}_{}:t{}:n -> {}_{}_{}_{}:b{}:s [color={}]",
input_plan.name(),
input_stage.num(),
input_task_i,
1, p,
plan.name(),
stage.num(),
task_i,
index,
p + (input_task_i * input_partitions_per_task),
p % NUM_COLORS + 1
)?;
}
continue;
}
for child in plan.children() {
queue.push_back(child);
}
}
Ok(f)
}
fn format_pg(partition_group: &[usize]) -> String {
partition_group
.iter()
.map(|pg| format!("{pg}"))
.collect::<Vec<_>>()
.join("_")
}
fn build_partition_group(task_i: usize, partitions: usize) -> Vec<usize> {
((task_i * partitions)..((task_i + 1) * partitions)).collect::<Vec<_>>()
}
fn find_input_stages(plan: &dyn ExecutionPlan) -> Vec<&Stage> {
let mut result = vec![];
for child in plan.children() {
if let Some(plan) = child.as_network_boundary() {
result.push(plan.input_stage());
} else {
result.extend(find_input_stages(child.as_ref()));
}
}
result
}
pub(crate) fn find_all_stages(plan: &Arc<dyn ExecutionPlan>) -> Vec<&Stage> {
let mut result = vec![];
if let Some(plan) = plan.as_network_boundary() {
result.push(plan.input_stage());
}
for child in plan.children() {
result.extend(find_all_stages(child));
}
result
}