use crate::TaskCountAnnotation::{Desired, Maximum};
use crate::execution_plans::{ChildWeight, ChildrenIsolatorUnionExec};
use crate::stage::LocalStage;
use crate::{
BroadcastExec, DistributedConfig, NetworkBoundaryExt, NetworkBroadcastExec,
NetworkCoalesceExec, NetworkShuffleExec, Stage, TaskCountAnnotation, TaskEstimator,
};
use async_trait::async_trait;
use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion};
use datafusion::common::{HashMap, Result, plan_err};
use datafusion::config::ConfigOptions;
use datafusion::physical_expr::Partitioning;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::execution_plan::CardinalityEffect;
use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode};
use datafusion::physical_plan::repartition::RepartitionExec;
use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use datafusion::physical_plan::union::UnionExec;
use datafusion::physical_plan::{ExecutionPlan, PlanProperties};
use std::any::TypeId;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::atomic::{AtomicUsize, Ordering};
use uuid::Uuid;
pub(crate) async fn inject_network_boundaries(
plan: Arc<dyn ExecutionPlan>,
nb_builder: impl NetworkBoundaryBuilder + Send + Sync,
cfg: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
let ctx = InjectNetworkBoundaryContext {
cfg,
d_cfg: DistributedConfig::from_config_options(cfg)?,
nb_builder: &nb_builder,
task_counts: &Mutex::new(HashMap::new()),
query_id: Uuid::new_v4(),
stage_id: &AtomicUsize::new(1),
};
_inject_network_boundaries(plan, None, &ctx).await
}
#[derive(Clone)]
pub(crate) struct InjectNetworkBoundaryContext<'a> {
cfg: &'a ConfigOptions,
d_cfg: &'a DistributedConfig,
nb_builder: &'a (dyn NetworkBoundaryBuilder + Send + Sync),
task_counts: &'a Mutex<HashMap<usize, TaskCountAnnotation>>,
query_id: Uuid,
stage_id: &'a AtomicUsize,
}
impl<'a> InjectNetworkBoundaryContext<'a> {
fn max_tasks(&self) -> Result<usize> {
Ok(match self.d_cfg.max_tasks_per_stage {
0 => self
.d_cfg
.__private_worker_resolver
.0
.get_urls()?
.len()
.max(1),
v => v,
})
}
fn set_task_count(&self, plan: &Arc<dyn ExecutionPlan>, task_count: TaskCountAnnotation) {
self.task_counts
.lock()
.expect("task counts mutex poisoned")
.insert(plan_ptr_key(plan), task_count);
}
fn plan_with_task_count(
&self,
plan: Arc<dyn ExecutionPlan>,
task_count: TaskCountAnnotation,
) -> Arc<dyn ExecutionPlan> {
self.set_task_count(&plan, task_count);
plan
}
fn task_count(&self, plan: &Arc<dyn ExecutionPlan>) -> Result<TaskCountAnnotation> {
let Some(task_count) = self
.task_counts
.lock()
.expect("task counts mutex poisoned")
.get(&plan_ptr_key(plan))
.cloned()
else {
return plan_err!(
"Missing task count for node {}. This is a bug in Distributed DataFusion's planner, please report it.",
plan.name()
);
};
Ok(task_count)
}
fn fetch_add_stage_id(&self) -> usize {
self.stage_id.fetch_add(1, Ordering::Acquire)
}
}
fn plan_ptr_key(plan: &Arc<dyn ExecutionPlan>) -> usize {
Arc::as_ptr(plan) as *const () as usize
}
async fn _inject_network_boundaries(
plan: Arc<dyn ExecutionPlan>,
parent: Option<&Arc<dyn ExecutionPlan>>,
nb_ctx: &InjectNetworkBoundaryContext<'_>,
) -> Result<Arc<dyn ExecutionPlan>> {
let broadcast_joins_enabled = nb_ctx.d_cfg.broadcast_joins;
let estimator = &nb_ctx.d_cfg.__private_task_estimator;
if plan.children().is_empty() {
return if let Some(estimate) = estimator.task_estimation(&plan, nb_ctx.cfg) {
Ok(nb_ctx.plan_with_task_count(plan, estimate.task_count.limit(nb_ctx.max_tasks()?)))
} else {
Ok(nb_ctx.plan_with_task_count(plan, Maximum(1)))
};
}
let mut futures = Vec::with_capacity(plan.children().len());
for child in plan.children() {
let child = Arc::clone(child);
futures.push(Box::pin(_inject_network_boundaries(
child,
Some(&plan),
nb_ctx,
)));
}
let processed_children = futures::future::try_join_all(futures).await?;
let mut task_count = estimator
.task_estimation(&plan, nb_ctx.cfg)
.map_or(Desired(1), |v| v.task_count);
if nb_ctx.d_cfg.children_isolator_unions && plan.is::<UnionExec>() {
let mut count = 0;
for processed_child in processed_children.iter() {
count += nb_ctx.task_count(processed_child)?.as_usize();
}
task_count = Desired(count);
} else if let Some(node) = plan.downcast_ref::<HashJoinExec>()
&& node.mode == PartitionMode::CollectLeft
&& !broadcast_joins_enabled
{
task_count = Maximum(1);
} else {
for processed_child in processed_children.iter() {
task_count = task_count.merge(nb_ctx.task_count(processed_child)?)
}
}
let plan = plan.with_new_children(processed_children)?;
task_count = task_count.limit(nb_ctx.max_tasks()?);
if let Some(r_exec) = plan.downcast_ref::<RepartitionExec>() {
if matches!(r_exec.partitioning(), Partitioning::Hash(_, _)) {
let input_stage = LocalStage {
query_id: nb_ctx.query_id,
num: nb_ctx.fetch_add_stage_id(),
plan: nb_ctx.plan_with_task_count(plan, task_count),
tasks: task_count.as_usize(),
};
let result = nb_ctx
.nb_builder
.build(input_stage, TypeId::of::<NetworkShuffleExec>(), nb_ctx)
.await?;
let nb = Arc::new(NetworkShuffleExec::from_stage(
result.input_stage,
result.input_properties,
));
return Ok(nb_ctx.plan_with_task_count(nb, result.consumer_task_count));
}
} else if let Some(parent) = parent
&& !plan.children().is_empty()
&& (parent.is::<CoalescePartitionsExec>()
|| parent.is::<SortPreservingMergeExec>())
{
return if plan.is::<BroadcastExec>() {
let input_stage = LocalStage {
query_id: nb_ctx.query_id,
num: nb_ctx.fetch_add_stage_id(),
plan: nb_ctx.plan_with_task_count(plan, task_count),
tasks: task_count.as_usize(),
};
let result = nb_ctx
.nb_builder
.build(input_stage, TypeId::of::<NetworkBroadcastExec>(), nb_ctx)
.await?;
let nb = Arc::new(NetworkBroadcastExec::from_stage(
result.input_stage,
result.input_properties,
));
Ok(nb_ctx.plan_with_task_count(nb, result.consumer_task_count))
} else {
let input_stage = LocalStage {
query_id: nb_ctx.query_id,
num: nb_ctx.fetch_add_stage_id(),
plan: nb_ctx.plan_with_task_count(plan, task_count),
tasks: task_count.as_usize(),
};
let result = nb_ctx
.nb_builder
.build(input_stage, TypeId::of::<NetworkCoalesceExec>(), nb_ctx)
.await?;
if !matches!(result.consumer_task_count, Maximum(1)) {
return plan_err!(
"A NetworkCoalesceExec must return exactly a Maximum(1) annotation above"
);
}
let nb = Arc::new(NetworkCoalesceExec::from_stage(
result.input_stage,
result.input_properties,
1,
));
Ok(nb_ctx.plan_with_task_count(nb, result.consumer_task_count))
};
}
if parent.is_none() {
nb_ctx.propagate_task_count_until_network_boundaries(&plan, task_count)
} else {
Ok(nb_ctx.plan_with_task_count(plan, task_count))
}
}
impl InjectNetworkBoundaryContext<'_> {
fn propagate_task_count_until_network_boundaries(
&self,
plan: &Arc<dyn ExecutionPlan>,
task_count: TaskCountAnnotation,
) -> Result<Arc<dyn ExecutionPlan>> {
if plan.children().is_empty() {
let scaled_up = self.d_cfg.__private_task_estimator.scale_up_leaf_node(
plan,
task_count.as_usize(),
self.cfg,
)?;
match scaled_up {
None => Ok(self.plan_with_task_count(Arc::clone(plan), task_count)),
Some(scaled_up) => {
scaled_up.apply(|plan| {
self.set_task_count(plan, task_count);
Ok(TreeNodeRecursion::Continue)
})?;
Ok(self.plan_with_task_count(scaled_up, task_count))
}
}
} else if plan.is_network_boundary() {
Ok(self.plan_with_task_count(Arc::clone(plan), task_count))
} else if self.d_cfg.children_isolator_unions && plan.is::<UnionExec>() {
let children = plan.children();
let c_i_union = ChildrenIsolatorUnionExec::from_children_and_weights(
children.iter().map(|v| Arc::clone(v)),
children
.iter()
.map(|v| match self.task_count(v)? {
Desired(n) => Ok(ChildWeight::desired(n as f64)),
Maximum(n) => Ok(ChildWeight::maximum(n)),
})
.collect::<Result<Vec<_>>>()?,
task_count.as_usize(),
)?;
let mut new_children = Vec::with_capacity(children.len());
let children_and_task_count = c_i_union
.children()
.into_iter()
.zip(c_i_union.child_task_counts());
for (child, task_count) in children_and_task_count {
new_children.push(
self.propagate_task_count_until_network_boundaries(child, Maximum(task_count))?,
);
}
let c_i_union = Arc::new(c_i_union).with_new_children(new_children)?;
Ok(self.plan_with_task_count(c_i_union, task_count))
} else {
let mut new_children = Vec::with_capacity(plan.children().len());
for child in plan.children() {
new_children
.push(self.propagate_task_count_until_network_boundaries(child, task_count)?);
}
let plan = Arc::clone(plan).with_new_children(new_children)?;
Ok(self.plan_with_task_count(plan, task_count))
}
}
}
pub(crate) struct NetworkBoundaryBuilderResult {
pub(crate) consumer_task_count: TaskCountAnnotation,
pub(crate) input_stage: Stage,
pub(crate) input_properties: Arc<PlanProperties>,
}
#[async_trait]
pub(crate) trait NetworkBoundaryBuilder {
async fn build<'a>(
&'a self,
input_stage: LocalStage,
nb_type: TypeId,
nb_ctx: &'a InjectNetworkBoundaryContext<'a>,
) -> Result<NetworkBoundaryBuilderResult>;
}
#[async_trait]
impl<T, F> NetworkBoundaryBuilder for T
where
T: Fn(LocalStage, TypeId, &InjectNetworkBoundaryContext) -> Result<F>,
T: Send + Sync,
F: Future<Output = Result<NetworkBoundaryBuilderResult>>,
F: Send,
{
async fn build<'a>(
&'a self,
input_stage: LocalStage,
nb_type: TypeId,
nb_ctx: &'a InjectNetworkBoundaryContext<'a>,
) -> Result<NetworkBoundaryBuilderResult> {
self(input_stage, nb_type, nb_ctx)?.await
}
}
pub(crate) struct CardinalityBasedNetworkBoundaryBuilder;
#[async_trait]
impl NetworkBoundaryBuilder for CardinalityBasedNetworkBoundaryBuilder {
async fn build<'a>(
&'a self,
mut input_stage: LocalStage,
nb_type: TypeId,
nb_ctx: &'a InjectNetworkBoundaryContext<'a>,
) -> Result<NetworkBoundaryBuilderResult> {
input_stage.plan = nb_ctx.propagate_task_count_until_network_boundaries(
&input_stage.plan,
Desired(input_stage.tasks),
)?;
let input_properties = Arc::clone(input_stage.plan.properties());
if nb_type == TypeId::of::<NetworkCoalesceExec>() {
return Ok(NetworkBoundaryBuilderResult {
consumer_task_count: Maximum(1),
input_stage: Stage::Local(input_stage),
input_properties,
});
}
fn calculate_scale_factor(plan: &Arc<dyn ExecutionPlan>, d_cfg: &DistributedConfig) -> f64 {
if plan.is_network_boundary() {
return 1.0;
};
let mut sf = None;
for plan in plan.children() {
sf = match sf {
None => Some(calculate_scale_factor(plan, d_cfg)),
Some(sf) => Some(sf.max(calculate_scale_factor(plan, d_cfg))),
}
}
let sf = sf.unwrap_or(1.0);
match plan.cardinality_effect() {
CardinalityEffect::LowerEqual => sf / d_cfg.cardinality_task_count_factor,
CardinalityEffect::GreaterEqual => sf * d_cfg.cardinality_task_count_factor,
_ => sf,
}
}
let f = calculate_scale_factor(&input_stage.plan, nb_ctx.d_cfg);
Ok(NetworkBoundaryBuilderResult {
consumer_task_count: Desired((f * input_stage.tasks as f64).ceil() as usize),
input_stage: Stage::Local(input_stage),
input_properties,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::distributed_planner::insert_broadcast::insert_broadcast_execs;
use crate::test_utils::plans::{BuildSideOneTaskEstimator, TestPlanBuilder};
use crate::{TaskEstimation, TaskEstimator, assert_snapshot};
use datafusion::config::ConfigOptions;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
#[tokio::test]
async fn test_select_all() {
let query = r#"
SELECT * FROM weather
"#;
let test_plan_builder = TestPlanBuilder::new()
.target_partitions(4)
.num_workers(4)
.distributed_planner(false)
.broadcast_joins(false);
let annotated = annotate_test_plan(test_plan_builder, query).await;
assert_snapshot!(annotated, @"DataSourceExec: task_count=Desired(4)")
}
#[tokio::test]
async fn test_aggregation() {
let query = r#"
SELECT count(*), "RainToday" FROM weather GROUP BY "RainToday" ORDER BY count(*)
"#;
let test_plan_builder = TestPlanBuilder::new()
.target_partitions(4)
.num_workers(4)
.distributed_planner(false)
.broadcast_joins(false);
let annotated = annotate_test_plan(test_plan_builder, query).await;
assert_snapshot!(annotated, @r"
SortPreservingMergeExec: task_count=Maximum(1)
NetworkCoalesceExec: task_count=Maximum(1)
SortExec: task_count=Desired(3)
ProjectionExec: task_count=Desired(3)
AggregateExec: task_count=Desired(3)
NetworkShuffleExec: task_count=Desired(3)
RepartitionExec: task_count=Desired(4)
AggregateExec: task_count=Desired(4)
DistributedLeafExec: task_count=Desired(4)
")
}
#[tokio::test]
async fn test_left_join() {
let query = r#"
SELECT a."MinTemp", b."MaxTemp" FROM weather a LEFT JOIN weather b ON a."RainToday" = b."RainToday"
"#;
let test_plan_builder = TestPlanBuilder::new()
.target_partitions(4)
.num_workers(4)
.distributed_planner(false)
.broadcast_joins(false);
let annotated = annotate_test_plan(test_plan_builder, query).await;
assert_snapshot!(annotated, @r"
HashJoinExec: task_count=Maximum(1)
CoalescePartitionsExec: task_count=Maximum(1)
DistributedLeafExec: task_count=Maximum(1)
DistributedLeafExec: task_count=Maximum(1)
")
}
#[tokio::test]
async fn test_left_join_distributed() {
let query = r#"
WITH a AS (
SELECT
AVG("MinTemp") as "MinTemp",
"RainTomorrow"
FROM weather
WHERE "RainToday" = 'yes'
GROUP BY "RainTomorrow"
), b AS (
SELECT
AVG("MaxTemp") as "MaxTemp",
"RainTomorrow"
FROM weather
WHERE "RainToday" = 'no'
GROUP BY "RainTomorrow"
)
SELECT
a."MinTemp",
b."MaxTemp"
FROM a
LEFT JOIN b
ON a."RainTomorrow" = b."RainTomorrow"
"#;
let test_plan_builder = TestPlanBuilder::new()
.target_partitions(4)
.num_workers(4)
.distributed_planner(false)
.broadcast_joins(false);
let annotated = annotate_test_plan(test_plan_builder, query).await;
assert_snapshot!(annotated, @r"
HashJoinExec: task_count=Maximum(1)
CoalescePartitionsExec: task_count=Maximum(1)
NetworkCoalesceExec: task_count=Maximum(1)
ProjectionExec: task_count=Desired(2)
AggregateExec: task_count=Desired(2)
NetworkShuffleExec: task_count=Desired(2)
RepartitionExec: task_count=Desired(4)
AggregateExec: task_count=Desired(4)
FilterExec: task_count=Desired(4)
RepartitionExec: task_count=Desired(4)
DistributedLeafExec: task_count=Desired(4)
ProjectionExec: task_count=Maximum(1)
AggregateExec: task_count=Maximum(1)
NetworkShuffleExec: task_count=Maximum(1)
RepartitionExec: task_count=Desired(4)
AggregateExec: task_count=Desired(4)
FilterExec: task_count=Desired(4)
RepartitionExec: task_count=Desired(4)
DistributedLeafExec: task_count=Desired(4)
")
}
#[tokio::test]
async fn test_inner_join() {
let query = r#"
SELECT a."MinTemp", b."MaxTemp" FROM weather a INNER JOIN weather b ON a."RainToday" = b."RainToday"
"#;
let test_plan_builder = TestPlanBuilder::new()
.target_partitions(4)
.num_workers(4)
.distributed_planner(false)
.broadcast_joins(false);
let annotated = annotate_test_plan(test_plan_builder, query).await;
assert_snapshot!(annotated, @r"
HashJoinExec: task_count=Maximum(1)
CoalescePartitionsExec: task_count=Maximum(1)
DistributedLeafExec: task_count=Maximum(1)
DistributedLeafExec: task_count=Maximum(1)
")
}
#[tokio::test]
async fn test_distinct() {
let query = r#"
SELECT DISTINCT "RainToday" FROM weather
"#;
let test_plan_builder = TestPlanBuilder::new()
.target_partitions(4)
.num_workers(4)
.distributed_planner(false)
.broadcast_joins(false);
let annotated = annotate_test_plan(test_plan_builder, query).await;
assert_snapshot!(annotated, @r"
AggregateExec: task_count=Desired(3)
NetworkShuffleExec: task_count=Desired(3)
RepartitionExec: task_count=Desired(4)
AggregateExec: task_count=Desired(4)
DistributedLeafExec: task_count=Desired(4)
")
}
#[tokio::test]
async fn test_union_all() {
let query = r#"
SELECT "MinTemp" FROM weather WHERE "RainToday" = 'yes'
UNION ALL
SELECT "MaxTemp" FROM weather WHERE "RainToday" = 'no'
"#;
let test_plan_builder = TestPlanBuilder::new()
.target_partitions(4)
.num_workers(4)
.distributed_planner(false)
.broadcast_joins(false);
let annotated = annotate_test_plan(test_plan_builder, query).await;
assert_snapshot!(annotated, @r"
ChildrenIsolatorUnionExec: task_count=Desired(4)
FilterExec: task_count=Maximum(2)
RepartitionExec: task_count=Maximum(2)
DistributedLeafExec: task_count=Maximum(2)
ProjectionExec: task_count=Maximum(2)
FilterExec: task_count=Maximum(2)
RepartitionExec: task_count=Maximum(2)
DistributedLeafExec: task_count=Maximum(2)
")
}
#[tokio::test]
async fn test_subquery() {
let query = r#"
SELECT * FROM (
SELECT "MinTemp", "MaxTemp" FROM weather WHERE "RainToday" = 'yes'
) AS subquery WHERE "MinTemp" > 5
"#;
let test_plan_builder = TestPlanBuilder::new()
.target_partitions(4)
.num_workers(4)
.distributed_planner(false)
.broadcast_joins(false);
let annotated = annotate_test_plan(test_plan_builder, query).await;
assert_snapshot!(annotated, @r"
FilterExec: task_count=Desired(4)
RepartitionExec: task_count=Desired(4)
DistributedLeafExec: task_count=Desired(4)
")
}
#[tokio::test]
async fn test_window_function() {
let query = r#"
SELECT "MinTemp", ROW_NUMBER() OVER (PARTITION BY "RainToday" ORDER BY "MinTemp") as rn
FROM weather
"#;
let test_plan_builder = TestPlanBuilder::new()
.target_partitions(4)
.num_workers(4)
.distributed_planner(false)
.broadcast_joins(false);
let annotated = annotate_test_plan(test_plan_builder, query).await;
assert_snapshot!(annotated, @r"
ProjectionExec: task_count=Desired(4)
BoundedWindowAggExec: task_count=Desired(4)
SortExec: task_count=Desired(4)
NetworkShuffleExec: task_count=Desired(4)
RepartitionExec: task_count=Desired(4)
DistributedLeafExec: task_count=Desired(4)
")
}
#[tokio::test]
async fn test_children_isolator_union() {
let query = r#"
SELECT "MinTemp" FROM weather WHERE "RainToday" = 'yes'
UNION ALL
SELECT "MaxTemp" FROM weather WHERE "RainToday" = 'no'
UNION ALL
SELECT "Rainfall" FROM weather WHERE "RainTomorrow" = 'yes'
"#;
let test_plan_builder = TestPlanBuilder::new()
.target_partitions(4)
.num_workers(4)
.distributed_planner(false)
.broadcast_joins(false);
let annotated = annotate_test_plan(test_plan_builder, query).await;
assert_snapshot!(annotated, @r"
ChildrenIsolatorUnionExec: task_count=Desired(4)
FilterExec: task_count=Maximum(2)
RepartitionExec: task_count=Maximum(2)
DistributedLeafExec: task_count=Maximum(2)
ProjectionExec: task_count=Maximum(1)
FilterExec: task_count=Maximum(1)
RepartitionExec: task_count=Maximum(1)
DistributedLeafExec: task_count=Maximum(1)
ProjectionExec: task_count=Maximum(1)
FilterExec: task_count=Maximum(1)
RepartitionExec: task_count=Maximum(1)
DistributedLeafExec: task_count=Maximum(1)
")
}
#[tokio::test]
async fn test_intermediate_task_estimator() {
let query = r#"
SELECT DISTINCT "RainToday" FROM weather
"#;
let task_estimator: Arc<dyn TaskEstimator + Send + Sync + 'static> =
Arc::new(CallbackEstimator::new(|_: &RepartitionExec| {
Some(TaskEstimation::maximum(1))
}));
let test_plan_builder = TestPlanBuilder::new()
.target_partitions(4)
.num_workers(4)
.distributed_planner(false)
.broadcast_joins(false)
.distributed_task_estimator(task_estimator);
let annotated = annotate_test_plan(test_plan_builder, query).await;
assert_snapshot!(annotated, @r"
AggregateExec: task_count=Desired(1)
NetworkShuffleExec: task_count=Desired(1)
RepartitionExec: task_count=Desired(1)
AggregateExec: task_count=Desired(1)
DistributedLeafExec: task_count=Desired(1)
")
}
#[tokio::test]
async fn test_union_all_limited_by_intermediate_estimator() {
let query = r#"
SELECT "MinTemp" FROM weather WHERE "RainToday" = 'yes'
UNION ALL
SELECT "MaxTemp" FROM weather WHERE "RainToday" = 'no'
"#;
let task_estimator: Arc<dyn TaskEstimator + Send + Sync + 'static> =
Arc::new(CallbackEstimator::new(|_: &RepartitionExec| {
Some(TaskEstimation::maximum(1))
}));
let test_plan_builder = TestPlanBuilder::new()
.target_partitions(4)
.num_workers(4)
.distributed_planner(false)
.broadcast_joins(false)
.distributed_task_estimator(task_estimator);
let annotated = annotate_test_plan(test_plan_builder, query).await;
assert_snapshot!(annotated, @r"
ChildrenIsolatorUnionExec: task_count=Desired(2)
FilterExec: task_count=Maximum(1)
RepartitionExec: task_count=Maximum(1)
DistributedLeafExec: task_count=Maximum(1)
ProjectionExec: task_count=Maximum(1)
FilterExec: task_count=Maximum(1)
RepartitionExec: task_count=Maximum(1)
DistributedLeafExec: task_count=Maximum(1)
")
}
#[tokio::test]
async fn test_broadcast_join_annotation() {
let query = r#"
SELECT a."MinTemp", b."MaxTemp"
FROM weather a INNER JOIN weather b
ON a."RainToday" = b."RainToday"
"#;
let test_plan_builder = TestPlanBuilder::new()
.target_partitions(4)
.num_workers(4)
.distributed_planner(false)
.broadcast_joins(true);
let annotated = annotate_test_plan(test_plan_builder, query).await;
assert_snapshot!(annotated, @r"
HashJoinExec: task_count=Desired(4)
CoalescePartitionsExec: task_count=Desired(4)
NetworkBroadcastExec: task_count=Desired(4)
BroadcastExec: task_count=Desired(4)
DistributedLeafExec: task_count=Desired(4)
DistributedLeafExec: task_count=Desired(4)
")
}
#[tokio::test]
async fn test_broadcast_datasource_as_build_child() {
let query = r#"
SELECT a."MinTemp", b."MaxTemp"
FROM weather a INNER JOIN weather b
ON a."RainToday" = b."RainToday"
"#;
let physical_plan_string = TestPlanBuilder::new()
.target_partitions(1)
.num_workers(4)
.build()
.await
.physical_plan_as_string(query)
.await;
assert_snapshot!(physical_plan_string, @r"
HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(RainToday@1, RainToday@1)], projection=[MinTemp@0, MaxTemp@2]
DataSourceExec: file_groups={1 group: [[/testdata/weather/result-000000.parquet, /testdata/weather/result-000001.parquet, /testdata/weather/result-000002.parquet]]}, projection=[MinTemp, RainToday], file_type=parquet
DataSourceExec: file_groups={1 group: [[/testdata/weather/result-000000.parquet, /testdata/weather/result-000001.parquet, /testdata/weather/result-000002.parquet]]}, projection=[MaxTemp, RainToday], file_type=parquet, predicate=DynamicFilter [ empty ]
");
let test_plan_builder = TestPlanBuilder::new()
.target_partitions(1)
.num_workers(4)
.distributed_planner(false)
.broadcast_joins(true);
let annotated = annotate_test_plan(test_plan_builder, query).await;
assert!(annotated.contains("Broadcast"));
assert_snapshot!(annotated, @r"
HashJoinExec: task_count=Desired(4)
CoalescePartitionsExec: task_count=Desired(4)
NetworkBroadcastExec: task_count=Desired(4)
BroadcastExec: task_count=Desired(4)
DistributedLeafExec: task_count=Desired(4)
DistributedLeafExec: task_count=Desired(4)
");
}
#[tokio::test]
async fn test_broadcast_one_to_many() {
let query = r#"
SELECT a."MinTemp", b."MaxTemp"
FROM weather a INNER JOIN weather b
ON a."RainToday" = b."RainToday"
"#;
let test_plan_builder = TestPlanBuilder::new()
.target_partitions(4)
.num_workers(3)
.distributed_planner(false)
.broadcast_joins(true)
.distributed_task_estimator(BuildSideOneTaskEstimator);
let annotated = annotate_test_plan(test_plan_builder, query).await;
assert_snapshot!(annotated, @r"
HashJoinExec: task_count=Desired(3)
CoalescePartitionsExec: task_count=Desired(3)
NetworkBroadcastExec: task_count=Desired(3)
BroadcastExec: task_count=Desired(1)
DistributedLeafExec: task_count=Desired(1)
DistributedLeafExec: task_count=Desired(3)
");
}
#[tokio::test]
async fn test_broadcast_build_coalesce_caps_join_stage() {
let query = r#"
SELECT a."MinTemp", b."MaxTemp"
FROM weather a INNER JOIN weather b
ON a."RainToday" = b."RainToday"
"#;
let test_plan_builder = TestPlanBuilder::new()
.target_partitions(4)
.num_workers(3)
.distributed_planner(false)
.broadcast_joins(true)
.distributed_task_estimator(BroadcastBuildCoalesceMaxEstimator);
let annotated = annotate_test_plan(test_plan_builder, query).await;
assert_snapshot!(annotated, @r"
HashJoinExec: task_count=Maximum(1)
CoalescePartitionsExec: task_count=Maximum(1)
NetworkBroadcastExec: task_count=Maximum(1)
BroadcastExec: task_count=Desired(3)
DistributedLeafExec: task_count=Desired(3)
DistributedLeafExec: task_count=Maximum(1)
");
}
#[tokio::test]
async fn test_broadcast_disabled_default() {
let query = r#"
SELECT a."MinTemp", b."MaxTemp"
FROM weather a INNER JOIN weather b
ON a."RainToday" = b."RainToday"
"#;
let test_plan_builder = TestPlanBuilder::new()
.target_partitions(4)
.num_workers(4)
.distributed_planner(false)
.broadcast_joins(false);
let annotated = annotate_test_plan(test_plan_builder, query).await;
assert!(!annotated.contains("Broadcast"));
assert_snapshot!(annotated, @r"
HashJoinExec: task_count=Maximum(1)
CoalescePartitionsExec: task_count=Maximum(1)
DistributedLeafExec: task_count=Maximum(1)
DistributedLeafExec: task_count=Maximum(1)
")
}
#[tokio::test]
async fn test_broadcast_multi_join_chain() {
let query = r#"
SELECT a."MinTemp", b."MaxTemp", c."Rainfall"
FROM weather a
INNER JOIN weather b ON a."RainToday" = b."RainToday"
INNER JOIN weather c ON b."RainToday" = c."RainToday"
"#;
let test_plan_builder = TestPlanBuilder::new()
.target_partitions(4)
.num_workers(4)
.distributed_planner(false)
.broadcast_joins(true);
let annotated = annotate_test_plan(test_plan_builder, query).await;
assert_snapshot!(annotated, @r"
HashJoinExec: task_count=Desired(4)
CoalescePartitionsExec: task_count=Desired(4)
NetworkBroadcastExec: task_count=Desired(4)
BroadcastExec: task_count=Desired(4)
HashJoinExec: task_count=Desired(4)
CoalescePartitionsExec: task_count=Desired(4)
NetworkBroadcastExec: task_count=Desired(4)
BroadcastExec: task_count=Desired(4)
DistributedLeafExec: task_count=Desired(4)
DistributedLeafExec: task_count=Desired(4)
DistributedLeafExec: task_count=Desired(4)
")
}
#[tokio::test]
async fn test_broadcast_union_children_isolator_annotation() {
let query = r#"
SELECT a."MinTemp", b."MaxTemp"
FROM weather a INNER JOIN weather b
ON a."RainToday" = b."RainToday"
UNION ALL
SELECT a."MinTemp", b."MaxTemp"
FROM weather a INNER JOIN weather b
ON a."RainToday" = b."RainToday"
UNION ALL
SELECT a."MinTemp", b."MaxTemp"
FROM weather a INNER JOIN weather b
ON a."RainToday" = b."RainToday"
"#;
let test_plan_builder = TestPlanBuilder::new()
.target_partitions(4)
.num_workers(4)
.distributed_planner(false)
.broadcast_joins(true)
.distributed_children_isolator_unions(true);
let annotated = annotate_test_plan(test_plan_builder, query).await;
assert_snapshot!(annotated, @r"
ChildrenIsolatorUnionExec: task_count=Desired(4)
HashJoinExec: task_count=Maximum(2)
CoalescePartitionsExec: task_count=Maximum(2)
NetworkBroadcastExec: task_count=Maximum(2)
BroadcastExec: task_count=Desired(4)
DistributedLeafExec: task_count=Desired(4)
DistributedLeafExec: task_count=Maximum(2)
HashJoinExec: task_count=Maximum(1)
CoalescePartitionsExec: task_count=Maximum(1)
NetworkBroadcastExec: task_count=Maximum(1)
BroadcastExec: task_count=Desired(4)
DistributedLeafExec: task_count=Desired(4)
DistributedLeafExec: task_count=Maximum(1)
HashJoinExec: task_count=Maximum(1)
CoalescePartitionsExec: task_count=Maximum(1)
NetworkBroadcastExec: task_count=Maximum(1)
BroadcastExec: task_count=Desired(4)
DistributedLeafExec: task_count=Desired(4)
DistributedLeafExec: task_count=Maximum(1)
");
}
#[allow(clippy::type_complexity)]
struct CallbackEstimator {
f: Arc<dyn Fn(&dyn ExecutionPlan) -> Option<TaskEstimation> + Send + Sync>,
}
impl CallbackEstimator {
fn new<T: ExecutionPlan + 'static>(
f: impl Fn(&T) -> Option<TaskEstimation> + Send + Sync + 'static,
) -> Self {
let f = Arc::new(move |plan: &dyn ExecutionPlan| -> Option<TaskEstimation> {
if let Some(plan) = plan.downcast_ref::<T>() {
f(plan)
} else {
None
}
});
Self { f }
}
}
impl TaskEstimator for CallbackEstimator {
fn task_estimation(
&self,
plan: &Arc<dyn ExecutionPlan>,
_: &ConfigOptions,
) -> Option<TaskEstimation> {
(self.f)(plan.as_ref())
}
fn scale_up_leaf_node(
&self,
_: &Arc<dyn ExecutionPlan>,
_: usize,
_: &ConfigOptions,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
Ok(None)
}
}
#[derive(Debug)]
struct BroadcastBuildCoalesceMaxEstimator;
impl TaskEstimator for BroadcastBuildCoalesceMaxEstimator {
fn task_estimation(
&self,
plan: &Arc<dyn ExecutionPlan>,
_: &ConfigOptions,
) -> Option<TaskEstimation> {
let coalesce = plan.downcast_ref::<CoalescePartitionsExec>()?;
if coalesce.input().is::<BroadcastExec>() {
Some(TaskEstimation::maximum(1))
} else {
None
}
}
fn scale_up_leaf_node(
&self,
_: &Arc<dyn ExecutionPlan>,
_: usize,
_: &ConfigOptions,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
Ok(None)
}
}
async fn annotate_test_plan(test_plan_builder: TestPlanBuilder, query: &str) -> String {
let test_plan = test_plan_builder.build().await;
let plan = test_plan.physical_plan(query).await;
let session_config = test_plan.get_ctx().copied_config();
let plan_w_broadcast = insert_broadcast_execs(plan, session_config.options())
.expect("failed to insert broadcasts");
let network_boundaries_ctx = InjectNetworkBoundaryContext {
cfg: session_config.options(),
d_cfg: DistributedConfig::from_config_options(session_config.options()).unwrap(),
task_counts: &Mutex::new(HashMap::new()),
query_id: Uuid::new_v4(),
stage_id: &AtomicUsize::new(1),
nb_builder: &CardinalityBasedNetworkBoundaryBuilder,
};
let annotated = _inject_network_boundaries(plan_w_broadcast, None, &network_boundaries_ctx)
.await
.expect("failed to annotate plan");
debug_annotated(&annotated, 0, &network_boundaries_ctx)
}
fn debug_annotated(
plan: &Arc<dyn ExecutionPlan>,
indent: usize,
ctx: &InjectNetworkBoundaryContext,
) -> String {
let mut result = format!(
"{}{}: task_count={:?}\n",
" ".repeat(indent),
plan.name(),
ctx.task_count(plan).unwrap()
);
for child in plan.children() {
result += &debug_annotated(child, indent + 1, ctx);
}
result
}
}