Skip to main content

datafusion_distributed/distributed_planner/
distributed_physical_optimizer_rule.rs

1use crate::common::require_one_child;
2use crate::distributed_planner::batch_coalescing_below_network_boundaries;
3use crate::distributed_planner::plan_annotator::{
4    AnnotatedPlan, PlanOrNetworkBoundary, annotate_plan,
5};
6use crate::{
7    DistributedConfig, DistributedExec, NetworkBroadcastExec, NetworkCoalesceExec,
8    NetworkShuffleExec, TaskEstimator,
9};
10use datafusion::config::ConfigOptions;
11use datafusion::error::DataFusionError;
12use datafusion::physical_optimizer::PhysicalOptimizerRule;
13use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
14use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties};
15use std::fmt::Debug;
16use std::ops::AddAssign;
17use std::sync::Arc;
18use uuid::Uuid;
19
20use super::insert_broadcast::insert_broadcast_execs;
21
22/// Physical optimizer rule that inspects the plan, places the appropriate network
23/// boundaries, and breaks it down into stages that can be executed in a distributed manner.
24///
25/// The rule has three steps:
26///
27/// 1. Annotate the plan with [annotate_plan]: adds some annotations to each node about how
28///    many distributed tasks should be used in the stage containing them, and whether they
29///    need a network boundary below or not.
30///    For more information about this step, read [annotate_plan] docs.
31///
32/// 2. Based on the [AnnotatedPlan] returned by [annotate_plan], place all the appropriate
33///    network boundaries ([NetworkShuffleExec] and [NetworkCoalesceExec]) with the task count
34///    assignation that the annotations required. After this, the plan is already a distributed
35///    executable plan.
36///
37/// 3. Place the [CoalesceBatchesExec] in the appropriate places (just below network boundaries),
38///    so that we send fewer and bigger record batches over the wire instead of a lot of small ones.
39#[derive(Debug, Default)]
40pub struct DistributedPhysicalOptimizerRule;
41
42impl PhysicalOptimizerRule for DistributedPhysicalOptimizerRule {
43    fn optimize(
44        &self,
45        original: Arc<dyn ExecutionPlan>,
46        cfg: &ConfigOptions,
47    ) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
48        if original.as_any().is::<DistributedExec>() {
49            return Ok(original);
50        }
51
52        let mut plan = Arc::clone(&original);
53        if original.output_partitioning().partition_count() > 1 {
54            plan = Arc::new(CoalescePartitionsExec::new(plan))
55        }
56
57        plan = insert_broadcast_execs(plan, cfg)?;
58
59        let annotated = annotate_plan(plan, cfg)?;
60
61        let mut stage_id = 1;
62        let distributed = distribute_plan(annotated, cfg, Uuid::new_v4(), &mut stage_id)?;
63        if stage_id == 1 {
64            return Ok(original);
65        }
66        let distributed = batch_coalescing_below_network_boundaries(distributed, cfg)?;
67
68        Ok(Arc::new(DistributedExec::new(distributed)))
69    }
70
71    fn name(&self) -> &str {
72        "DistributedPhysicalOptimizer"
73    }
74
75    fn schema_check(&self) -> bool {
76        true
77    }
78}
79
80/// Takes an [AnnotatedPlan] and returns a modified [ExecutionPlan] with all the network boundaries
81/// appropriately placed. This step performs the following modifications to the original
82/// [ExecutionPlan]:
83/// - The leaf nodes are scaled up in parallelism based on the number of distributed tasks in
84///   which they are going to run. This is configurable by the user via the [TaskEstimator] trait.
85/// - The appropriate network boundaries are placed in the plan depending on how it was annotated,
86///   so new nodes like [NetworkBroadcastExec], [NetworkCoalesceExec] and [NetworkShuffleExec] will be present.
87fn distribute_plan(
88    annotated_plan: AnnotatedPlan,
89    cfg: &ConfigOptions,
90    query_id: Uuid,
91    stage_id: &mut usize,
92) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
93    let d_cfg = DistributedConfig::from_config_options(cfg)?;
94    let children = annotated_plan.children;
95    let task_count = annotated_plan.task_count.as_usize();
96    let max_child_task_count = children.iter().map(|v| v.task_count.as_usize()).max();
97    let new_children = children
98        .into_iter()
99        .map(|child| distribute_plan(child, cfg, query_id, stage_id))
100        .collect::<Result<Vec<_>, _>>()?;
101    match annotated_plan.plan_or_nb {
102        // This is a leaf node. It needs to be scaled up in order to account for it running in
103        // multiple tasks.
104        PlanOrNetworkBoundary::Plan(plan) if plan.children().is_empty() => {
105            let scaled_up = d_cfg.__private_task_estimator.scale_up_leaf_node(
106                &plan,
107                annotated_plan.task_count.as_usize(),
108                cfg,
109            );
110            Ok(scaled_up.unwrap_or(plan))
111        }
112        // This is a normal intermediate plan, just pass it through with the mapped children.
113        PlanOrNetworkBoundary::Plan(plan) => plan.with_new_children(new_children),
114        // This is a shuffle, so inject a NetworkShuffleExec here in the plan.
115        PlanOrNetworkBoundary::Shuffle => {
116            // It would need a network boundary, but on both sides of the boundary there is just 1 task,
117            // so we are fine with not introducing any network boundary.
118            if task_count == 1 && max_child_task_count == Some(1) {
119                return require_one_child(new_children);
120            }
121            let node = Arc::new(NetworkShuffleExec::try_new(
122                require_one_child(new_children)?,
123                query_id,
124                *stage_id,
125                task_count,
126                max_child_task_count.unwrap_or(1),
127            )?);
128            stage_id.add_assign(1);
129            Ok(node)
130        }
131        // DataFusion is trying to coalesce multiple partitions into one, so we should do the
132        // same with tasks.
133        PlanOrNetworkBoundary::Coalesce => {
134            // It would need a network boundary, but on both sides of the boundary there is just 1 task,
135            // so we are fine with not introducing any network boundary.
136            if task_count == 1 && max_child_task_count == Some(1) {
137                return require_one_child(new_children);
138            }
139            let node = Arc::new(NetworkCoalesceExec::try_new(
140                require_one_child(new_children)?,
141                query_id,
142                *stage_id,
143                task_count,
144                max_child_task_count.unwrap_or(1),
145            )?);
146            stage_id.add_assign(1);
147            Ok(node)
148        }
149        // This is a CollectLeft HashJoinExec with the build side marked as being broadcast. we
150        // need to insert a NetworkBroadcastExec and scale up the BroadcastExec consumer_tasks.
151        PlanOrNetworkBoundary::Broadcast => {
152            // It would need a network boundary, but on both sides of the boundary there is just 1 task,
153            // so we are fine with not introducing any network boundary.
154            if task_count == 1 && max_child_task_count == Some(1) {
155                return require_one_child(new_children);
156            }
157            let node = Arc::new(NetworkBroadcastExec::try_new(
158                require_one_child(new_children)?,
159                query_id,
160                *stage_id,
161                task_count,
162                max_child_task_count.unwrap_or(1),
163            )?);
164            stage_id.add_assign(1);
165            Ok(node)
166        }
167    }
168}
169
170#[cfg(test)]
171mod tests {
172    use crate::test_utils::in_memory_channel_resolver::InMemoryWorkerResolver;
173    use crate::test_utils::plans::{
174        BuildSideOneTaskEstimator, TestPlanOptions, base_session_builder, context_with_query,
175        sql_to_physical_plan,
176    };
177    use crate::{
178        DistributedExt, DistributedPhysicalOptimizerRule, assert_snapshot, display_plan_ascii,
179    };
180    use datafusion::execution::SessionStateBuilder;
181    use datafusion::physical_plan::displayable;
182    use std::sync::Arc;
183    /* schema for the "weather" table
184
185     MinTemp [type=DOUBLE] [repetitiontype=OPTIONAL]
186     MaxTemp [type=DOUBLE] [repetitiontype=OPTIONAL]
187     Rainfall [type=DOUBLE] [repetitiontype=OPTIONAL]
188     Evaporation [type=DOUBLE] [repetitiontype=OPTIONAL]
189     Sunshine [type=BYTE_ARRAY] [convertedtype=UTF8] [repetitiontype=OPTIONAL]
190     WindGustDir [type=BYTE_ARRAY] [convertedtype=UTF8] [repetitiontype=OPTIONAL]
191     WindGustSpeed [type=BYTE_ARRAY] [convertedtype=UTF8] [repetitiontype=OPTIONAL]
192     WindDir9am [type=BYTE_ARRAY] [convertedtype=UTF8] [repetitiontype=OPTIONAL]
193     WindDir3pm [type=BYTE_ARRAY] [convertedtype=UTF8] [repetitiontype=OPTIONAL]
194     WindSpeed9am [type=BYTE_ARRAY] [convertedtype=UTF8] [repetitiontype=OPTIONAL]
195     WindSpeed3pm [type=INT64] [convertedtype=INT_64] [repetitiontype=OPTIONAL]
196     Humidity9am [type=INT64] [convertedtype=INT_64] [repetitiontype=OPTIONAL]
197     Humidity3pm [type=INT64] [convertedtype=INT_64] [repetitiontype=OPTIONAL]
198     Pressure9am [type=DOUBLE] [repetitiontype=OPTIONAL]
199     Pressure3pm [type=DOUBLE] [repetitiontype=OPTIONAL]
200     Cloud9am [type=INT64] [convertedtype=INT_64] [repetitiontype=OPTIONAL]
201     Cloud3pm [type=INT64] [convertedtype=INT_64] [repetitiontype=OPTIONAL]
202     Temp9am [type=DOUBLE] [repetitiontype=OPTIONAL]
203     Temp3pm [type=DOUBLE] [repetitiontype=OPTIONAL]
204     RainToday [type=BYTE_ARRAY] [convertedtype=UTF8] [repetitiontype=OPTIONAL]
205     RISK_MM [type=DOUBLE] [repetitiontype=OPTIONAL]
206     RainTomorrow [type=BYTE_ARRAY] [convertedtype=UTF8] [repetitiontype=OPTIONAL]
207    */
208
209    #[tokio::test]
210    async fn test_select_all() {
211        let query = r#"
212        SELECT * FROM weather
213        "#;
214        let plan = sql_to_explain(query, |b| {
215            b.with_distributed_worker_resolver(InMemoryWorkerResolver::new(3))
216        })
217        .await;
218        assert_snapshot!(plan, @"DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, MaxTemp, Rainfall, Evaporation, Sunshine, WindGustDir, WindGustSpeed, WindDir9am, WindDir3pm, WindSpeed9am, WindSpeed3pm, Humidity9am, Humidity3pm, Pressure9am, Pressure3pm, Cloud9am, Cloud3pm, Temp9am, Temp3pm, RainToday, RISK_MM, RainTomorrow], file_type=parquet");
219    }
220
221    #[tokio::test]
222    async fn test_aggregation() {
223        let query = r#"
224        SELECT count(*), "RainToday" FROM weather GROUP BY "RainToday" ORDER BY count(*)
225        "#;
226        let plan = sql_to_explain(query, |b| {
227            b.with_distributed_worker_resolver(InMemoryWorkerResolver::new(3))
228        })
229        .await;
230        assert_snapshot!(plan, @"
231        ┌───── DistributedExec ── Tasks: t0:[p0] 
232        │ ProjectionExec: expr=[count(*)@0 as count(*), RainToday@1 as RainToday]
233        │   SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST]
234        │     [Stage 2] => NetworkCoalesceExec: output_partitions=8, input_tasks=2
235        └──────────────────────────────────────────────────
236          ┌───── Stage 2 ── Tasks: t0:[p0..p3] t1:[p0..p3] 
237          │ SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true]
238          │   ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))]
239          │     AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
240          │       [Stage 1] => NetworkShuffleExec: output_partitions=4, input_tasks=3
241          └──────────────────────────────────────────────────
242            ┌───── Stage 1 ── Tasks: t0:[p0..p7] t1:[p0..p7] t2:[p0..p7] 
243            │ RepartitionExec: partitioning=Hash([RainToday@0], 8), input_partitions=1
244            │   AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
245            │     PartitionIsolatorExec: tasks=3 partitions=3
246            │       DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet
247            └──────────────────────────────────────────────────
248        ");
249    }
250
251    #[tokio::test]
252    async fn test_aggregation_with_fewer_workers_than_files() {
253        let query = r#"
254        SELECT count(*), "RainToday" FROM weather GROUP BY "RainToday" ORDER BY count(*)
255        "#;
256        let plan = sql_to_explain(query, |b| {
257            b.with_distributed_worker_resolver(InMemoryWorkerResolver::new(2))
258        })
259        .await;
260        assert_snapshot!(plan, @"
261        ┌───── DistributedExec ── Tasks: t0:[p0] 
262        │ ProjectionExec: expr=[count(*)@0 as count(*), RainToday@1 as RainToday]
263        │   SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST]
264        │     [Stage 2] => NetworkCoalesceExec: output_partitions=8, input_tasks=2
265        └──────────────────────────────────────────────────
266          ┌───── Stage 2 ── Tasks: t0:[p0..p3] t1:[p0..p3] 
267          │ SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true]
268          │   ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))]
269          │     AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
270          │       [Stage 1] => NetworkShuffleExec: output_partitions=4, input_tasks=2
271          └──────────────────────────────────────────────────
272            ┌───── Stage 1 ── Tasks: t0:[p0..p7] t1:[p0..p7] 
273            │ RepartitionExec: partitioning=Hash([RainToday@0], 8), input_partitions=2
274            │   AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
275            │     PartitionIsolatorExec: tasks=2 partitions=3
276            │       DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet
277            └──────────────────────────────────────────────────
278        ");
279    }
280
281    #[tokio::test]
282    async fn test_aggregation_with_0_workers() {
283        let query = r#"
284        SELECT count(*), "RainToday" FROM weather GROUP BY "RainToday" ORDER BY count(*)
285        "#;
286        let plan = sql_to_explain(query, |b| {
287            b.with_distributed_worker_resolver(InMemoryWorkerResolver::new(0))
288        })
289        .await;
290        assert_snapshot!(plan, @r"
291        ProjectionExec: expr=[count(*)@0 as count(*), RainToday@1 as RainToday]
292          SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST]
293            SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true]
294              ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))]
295                AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
296                  RepartitionExec: partitioning=Hash([RainToday@0], 4), input_partitions=3
297                    AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
298                      DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet
299        ");
300    }
301
302    #[tokio::test]
303    async fn test_aggregation_with_high_cardinality_factor() {
304        let query = r#"
305        SELECT count(*), "RainToday" FROM weather GROUP BY "RainToday" ORDER BY count(*)
306        "#;
307        let plan = sql_to_explain(query, |b| {
308            b.with_distributed_worker_resolver(InMemoryWorkerResolver::new(3))
309                .with_distributed_cardinality_effect_task_scale_factor(3.0)
310                .unwrap()
311        })
312        .await;
313        assert_snapshot!(plan, @"
314        ┌───── DistributedExec ── Tasks: t0:[p0] 
315        │ ProjectionExec: expr=[count(*)@0 as count(*), RainToday@1 as RainToday]
316        │   SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST]
317        │     SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true]
318        │       ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))]
319        │         AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
320        │           [Stage 1] => NetworkShuffleExec: output_partitions=4, input_tasks=3
321        └──────────────────────────────────────────────────
322          ┌───── Stage 1 ── Tasks: t0:[p0..p3] t1:[p0..p3] t2:[p0..p3] 
323          │ RepartitionExec: partitioning=Hash([RainToday@0], 4), input_partitions=1
324          │   AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
325          │     PartitionIsolatorExec: tasks=3 partitions=3
326          │       DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet
327          └──────────────────────────────────────────────────
328        ");
329    }
330
331    #[tokio::test]
332    async fn test_aggregation_with_a_lot_of_files_per_task() {
333        let query = r#"
334        SELECT count(*), "RainToday" FROM weather GROUP BY "RainToday" ORDER BY count(*)
335        "#;
336        let plan = sql_to_explain(query, |b| {
337            b.with_distributed_worker_resolver(InMemoryWorkerResolver::new(3))
338                .with_distributed_files_per_task(3)
339                .unwrap()
340        })
341        .await;
342        assert_snapshot!(plan, @r"
343        ProjectionExec: expr=[count(*)@0 as count(*), RainToday@1 as RainToday]
344          SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST]
345            SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true]
346              ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))]
347                AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
348                  RepartitionExec: partitioning=Hash([RainToday@0], 4), input_partitions=3
349                    AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
350                      DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet
351        ");
352    }
353
354    #[tokio::test]
355    async fn test_aggregation_with_partitions_per_task() {
356        let query = r#"
357        SELECT count(*), "RainToday" FROM weather GROUP BY "RainToday" ORDER BY count(*)
358        "#;
359        let plan = sql_to_explain(query, |b| {
360            b.with_distributed_worker_resolver(InMemoryWorkerResolver::new(3))
361        })
362        .await;
363        assert_snapshot!(plan, @"
364        ┌───── DistributedExec ── Tasks: t0:[p0] 
365        │ ProjectionExec: expr=[count(*)@0 as count(*), RainToday@1 as RainToday]
366        │   SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST]
367        │     [Stage 2] => NetworkCoalesceExec: output_partitions=8, input_tasks=2
368        └──────────────────────────────────────────────────
369          ┌───── Stage 2 ── Tasks: t0:[p0..p3] t1:[p0..p3] 
370          │ SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true]
371          │   ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))]
372          │     AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
373          │       [Stage 1] => NetworkShuffleExec: output_partitions=4, input_tasks=3
374          └──────────────────────────────────────────────────
375            ┌───── Stage 1 ── Tasks: t0:[p0..p7] t1:[p0..p7] t2:[p0..p7] 
376            │ RepartitionExec: partitioning=Hash([RainToday@0], 8), input_partitions=1
377            │   AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
378            │     PartitionIsolatorExec: tasks=3 partitions=3
379            │       DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet
380            └──────────────────────────────────────────────────
381        ");
382    }
383
384    #[tokio::test]
385    async fn test_left_join() {
386        let query = r#"
387        SELECT a."MinTemp", b."MaxTemp" FROM weather a LEFT JOIN weather b ON a."RainToday" = b."RainToday"
388        "#;
389        let plan = sql_to_explain(query, |b| {
390            b.with_distributed_worker_resolver(InMemoryWorkerResolver::new(3))
391        })
392        .await;
393        assert_snapshot!(plan, @r"
394        HashJoinExec: mode=CollectLeft, join_type=Left, on=[(RainToday@1, RainToday@1)], projection=[MinTemp@0, MaxTemp@2]
395          CoalescePartitionsExec
396            DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, RainToday], file_type=parquet
397          DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MaxTemp, RainToday], file_type=parquet
398        ");
399    }
400
401    #[tokio::test]
402    async fn test_left_join_distributed() {
403        let query = r#"
404        WITH a AS (
405            SELECT
406                AVG("MinTemp") as "MinTemp",
407                "RainTomorrow"
408            FROM weather
409            WHERE "RainToday" = 'yes'
410            GROUP BY "RainTomorrow"
411        ), b AS (
412            SELECT
413                AVG("MaxTemp") as "MaxTemp",
414                "RainTomorrow"
415            FROM weather
416            WHERE "RainToday" = 'no'
417            GROUP BY "RainTomorrow"
418        )
419        SELECT
420            a."MinTemp",
421            b."MaxTemp"
422        FROM a
423        LEFT JOIN b
424        ON a."RainTomorrow" = b."RainTomorrow"
425        "#;
426        let plan = sql_to_explain(query, |b| {
427            b.with_distributed_worker_resolver(InMemoryWorkerResolver::new(3))
428        })
429        .await;
430        assert_snapshot!(plan, @"
431        ┌───── DistributedExec ── Tasks: t0:[p0] 
432        │ CoalescePartitionsExec
433        │   HashJoinExec: mode=CollectLeft, join_type=Left, on=[(RainTomorrow@1, RainTomorrow@1)], projection=[MinTemp@0, MaxTemp@2]
434        │     CoalescePartitionsExec
435        │       [Stage 2] => NetworkCoalesceExec: output_partitions=8, input_tasks=2
436        │     ProjectionExec: expr=[avg(weather.MaxTemp)@1 as MaxTemp, RainTomorrow@0 as RainTomorrow]
437        │       AggregateExec: mode=FinalPartitioned, gby=[RainTomorrow@0 as RainTomorrow], aggr=[avg(weather.MaxTemp)]
438        │         [Stage 3] => NetworkShuffleExec: output_partitions=4, input_tasks=3
439        └──────────────────────────────────────────────────
440          ┌───── Stage 2 ── Tasks: t0:[p0..p3] t1:[p0..p3] 
441          │ ProjectionExec: expr=[avg(weather.MinTemp)@1 as MinTemp, RainTomorrow@0 as RainTomorrow]
442          │   AggregateExec: mode=FinalPartitioned, gby=[RainTomorrow@0 as RainTomorrow], aggr=[avg(weather.MinTemp)]
443          │     [Stage 1] => NetworkShuffleExec: output_partitions=4, input_tasks=3
444          └──────────────────────────────────────────────────
445            ┌───── Stage 1 ── Tasks: t0:[p0..p7] t1:[p0..p7] t2:[p0..p7] 
446            │ RepartitionExec: partitioning=Hash([RainTomorrow@0], 8), input_partitions=4
447            │   AggregateExec: mode=Partial, gby=[RainTomorrow@1 as RainTomorrow], aggr=[avg(weather.MinTemp)]
448            │     FilterExec: RainToday@1 = yes, projection=[MinTemp@0, RainTomorrow@2]
449            │       RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
450            │         PartitionIsolatorExec: tasks=3 partitions=3
451            │           DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, RainToday, RainTomorrow], file_type=parquet, predicate=RainToday@19 = yes, pruning_predicate=RainToday_null_count@2 != row_count@3 AND RainToday_min@0 <= yes AND yes <= RainToday_max@1, required_guarantees=[RainToday in (yes)]
452            └──────────────────────────────────────────────────
453          ┌───── Stage 3 ── Tasks: t0:[p0..p3] t1:[p0..p3] t2:[p0..p3] 
454          │ RepartitionExec: partitioning=Hash([RainTomorrow@0], 4), input_partitions=4
455          │   AggregateExec: mode=Partial, gby=[RainTomorrow@1 as RainTomorrow], aggr=[avg(weather.MaxTemp)]
456          │     FilterExec: RainToday@1 = no, projection=[MaxTemp@0, RainTomorrow@2]
457          │       RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
458          │         PartitionIsolatorExec: tasks=3 partitions=3
459          │           DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MaxTemp, RainToday, RainTomorrow], file_type=parquet, predicate=RainToday@19 = no, pruning_predicate=RainToday_null_count@2 != row_count@3 AND RainToday_min@0 <= no AND no <= RainToday_max@1, required_guarantees=[RainToday in (no)]
460          └──────────────────────────────────────────────────
461        ");
462    }
463
464    #[tokio::test]
465    async fn test_sort() {
466        let query = r#"
467        SELECT * FROM weather ORDER BY "MinTemp" DESC
468        "#;
469        let plan = sql_to_explain(query, |b| {
470            b.with_distributed_worker_resolver(InMemoryWorkerResolver::new(3))
471        })
472        .await;
473        assert_snapshot!(plan, @"
474        ┌───── DistributedExec ── Tasks: t0:[p0] 
475        │ SortPreservingMergeExec: [MinTemp@0 DESC]
476        │   [Stage 1] => NetworkCoalesceExec: output_partitions=3, input_tasks=3
477        └──────────────────────────────────────────────────
478          ┌───── Stage 1 ── Tasks: t0:[p0] t1:[p1] t2:[p2] 
479          │ SortExec: expr=[MinTemp@0 DESC], preserve_partitioning=[true]
480          │   PartitionIsolatorExec: tasks=3 partitions=3
481          │     DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, MaxTemp, Rainfall, Evaporation, Sunshine, WindGustDir, WindGustSpeed, WindDir9am, WindDir3pm, WindSpeed9am, WindSpeed3pm, Humidity9am, Humidity3pm, Pressure9am, Pressure3pm, Cloud9am, Cloud3pm, Temp9am, Temp3pm, RainToday, RISK_MM, RainTomorrow], file_type=parquet
482          └──────────────────────────────────────────────────
483        ");
484    }
485
486    #[tokio::test]
487    async fn test_distinct() {
488        let query = r#"
489        SELECT DISTINCT "RainToday", "WindGustDir" FROM weather
490        "#;
491        let plan = sql_to_explain(query, |b| {
492            b.with_distributed_worker_resolver(InMemoryWorkerResolver::new(3))
493        })
494        .await;
495        assert_snapshot!(plan, @"
496        ┌───── DistributedExec ── Tasks: t0:[p0] 
497        │ CoalescePartitionsExec
498        │   [Stage 2] => NetworkCoalesceExec: output_partitions=8, input_tasks=2
499        └──────────────────────────────────────────────────
500          ┌───── Stage 2 ── Tasks: t0:[p0..p3] t1:[p0..p3] 
501          │ AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday, WindGustDir@1 as WindGustDir], aggr=[]
502          │   [Stage 1] => NetworkShuffleExec: output_partitions=4, input_tasks=3
503          └──────────────────────────────────────────────────
504            ┌───── Stage 1 ── Tasks: t0:[p0..p7] t1:[p0..p7] t2:[p0..p7] 
505            │ RepartitionExec: partitioning=Hash([RainToday@0, WindGustDir@1], 8), input_partitions=1
506            │   AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday, WindGustDir@1 as WindGustDir], aggr=[]
507            │     PartitionIsolatorExec: tasks=3 partitions=3
508            │       DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday, WindGustDir], file_type=parquet
509            └──────────────────────────────────────────────────
510        ");
511    }
512
513    #[tokio::test]
514    async fn test_show_columns() {
515        let query = r#"
516        SHOW COLUMNS from weather
517        "#;
518        let plan = sql_to_explain(query, |b| {
519            b.with_distributed_worker_resolver(InMemoryWorkerResolver::new(3))
520        })
521        .await;
522        assert_snapshot!(plan, @r"
523        ProjectionExec: expr=[table_catalog@0 as table_catalog, table_schema@1 as table_schema, table_name@2 as table_name, column_name@3 as column_name, data_type@5 as data_type, is_nullable@4 as is_nullable]
524          FilterExec: table_name@2 = weather
525            RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
526              StreamingTableExec: partition_sizes=1, projection=[table_catalog, table_schema, table_name, column_name, is_nullable, data_type]
527        ");
528    }
529
530    #[tokio::test]
531    async fn test_limited_by_worker() {
532        let query = r#"
533        SET datafusion.execution.target_partitions=2;
534        SELECT 1 FROM weather
535        UNION ALL
536        SELECT 1 FROM flights_1m
537        "#;
538        let plan = sql_to_explain(query, |b| {
539            b.with_distributed_worker_resolver(InMemoryWorkerResolver::new(2))
540        })
541        .await;
542        assert_snapshot!(plan, @r"
543        ┌───── DistributedExec ── Tasks: t0:[p0] 
544        │ CoalescePartitionsExec
545        │   [Stage 1] => NetworkCoalesceExec: output_partitions=4, input_tasks=2
546        └──────────────────────────────────────────────────
547          ┌───── Stage 1 ── Tasks: t0:[p0..p1] t1:[p2..p3] 
548          │ DistributedUnionExec: t0:[c0] t1:[c1]
549          │   DataSourceExec: file_groups={2 groups: [[/testdata/weather/result-000000.parquet, /testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[1 as Int64(1)], file_type=parquet
550          │   DataSourceExec: file_groups={1 group: [[/testdata/flights-1m.parquet]]}, projection=[1 as Int64(1)], file_type=parquet
551          └──────────────────────────────────────────────────
552        ");
553    }
554
555    #[tokio::test]
556    async fn test_limited_by_config() {
557        let query = r#"
558        SET distributed.max_tasks_per_stage=2;
559        SELECT 1 FROM weather
560        UNION ALL
561        SELECT 1 FROM flights_1m
562        "#;
563        let plan = sql_to_explain(query, |b| {
564            b.with_distributed_worker_resolver(InMemoryWorkerResolver::new(3))
565        })
566        .await;
567        assert_snapshot!(plan, @r"
568        ┌───── DistributedExec ── Tasks: t0:[p0] 
569        │ CoalescePartitionsExec
570        │   [Stage 1] => NetworkCoalesceExec: output_partitions=6, input_tasks=2
571        └──────────────────────────────────────────────────
572          ┌───── Stage 1 ── Tasks: t0:[p0..p2] t1:[p3..p5] 
573          │ DistributedUnionExec: t0:[c0] t1:[c1]
574          │   DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[1 as Int64(1)], file_type=parquet
575          │   DataSourceExec: file_groups={1 group: [[/testdata/flights-1m.parquet]]}, projection=[1 as Int64(1)], file_type=parquet
576          └──────────────────────────────────────────────────
577        ");
578    }
579
580    #[tokio::test]
581    async fn test_unioning_2_tables() {
582        let query = r#"
583        set distributed.children_isolator_unions=true;
584        SELECT "MinTemp", "RainToday" FROM weather WHERE "MinTemp" > 10.0
585        UNION ALL
586        SELECT "MaxTemp", "RainToday" FROM weather WHERE "MaxTemp" < 30.0
587        "#;
588        let plan = sql_to_explain(query, |b| {
589            b.with_distributed_worker_resolver(InMemoryWorkerResolver::new(6))
590        })
591        .await;
592        assert_snapshot!(plan, @"
593        ┌───── DistributedExec ── Tasks: t0:[p0] 
594        │ CoalescePartitionsExec
595        │   [Stage 1] => NetworkCoalesceExec: output_partitions=24, input_tasks=6
596        └──────────────────────────────────────────────────
597          ┌───── Stage 1 ── Tasks: t0:[p0..p3] t1:[p4..p7] t2:[p8..p11] t3:[p12..p15] t4:[p16..p19] t5:[p20..p23] 
598          │ DistributedUnionExec: t0:[c0(0/3)] t1:[c0(1/3)] t2:[c0(2/3)] t3:[c1(0/3)] t4:[c1(1/3)] t5:[c1(2/3)]
599          │   FilterExec: MinTemp@0 > 10
600          │     RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
601          │       PartitionIsolatorExec: tasks=3 partitions=3
602          │         DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, RainToday], file_type=parquet, predicate=MinTemp@0 > 10, pruning_predicate=MinTemp_null_count@1 != row_count@2 AND MinTemp_max@0 > 10, required_guarantees=[]
603          │   ProjectionExec: expr=[MaxTemp@0 as MinTemp, RainToday@1 as RainToday]
604          │     FilterExec: MaxTemp@0 < 30
605          │       RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
606          │         PartitionIsolatorExec: tasks=3 partitions=3
607          │           DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MaxTemp, RainToday], file_type=parquet, predicate=MaxTemp@1 < 30, pruning_predicate=MaxTemp_null_count@1 != row_count@2 AND MaxTemp_min@0 < 30, required_guarantees=[]
608          └──────────────────────────────────────────────────
609        ");
610    }
611
612    #[tokio::test]
613    async fn test_unioning_2_tables_limited_workers() {
614        let query = r#"
615        set distributed.children_isolator_unions=true;
616        SELECT "MinTemp", "RainToday" FROM weather WHERE "MinTemp" > 10.0
617        UNION ALL
618        SELECT "MaxTemp", "RainToday" FROM weather WHERE "MaxTemp" < 30.0
619        "#;
620        let plan = sql_to_explain(query, |b| {
621            b.with_distributed_worker_resolver(InMemoryWorkerResolver::new(3))
622        })
623        .await;
624        assert_snapshot!(plan, @"
625        ┌───── DistributedExec ── Tasks: t0:[p0] 
626        │ CoalescePartitionsExec
627        │   [Stage 1] => NetworkCoalesceExec: output_partitions=12, input_tasks=3
628        └──────────────────────────────────────────────────
629          ┌───── Stage 1 ── Tasks: t0:[p0..p3] t1:[p4..p7] t2:[p8..p11] 
630          │ DistributedUnionExec: t0:[c0] t1:[c1(0/2)] t2:[c1(1/2)]
631          │   FilterExec: MinTemp@0 > 10
632          │     RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=3
633          │       DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, RainToday], file_type=parquet, predicate=MinTemp@0 > 10, pruning_predicate=MinTemp_null_count@1 != row_count@2 AND MinTemp_max@0 > 10, required_guarantees=[]
634          │   ProjectionExec: expr=[MaxTemp@0 as MinTemp, RainToday@1 as RainToday]
635          │     FilterExec: MaxTemp@0 < 30
636          │       RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
637          │         PartitionIsolatorExec: tasks=2 partitions=3
638          │           DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MaxTemp, RainToday], file_type=parquet, predicate=MaxTemp@1 < 30, pruning_predicate=MaxTemp_null_count@1 != row_count@2 AND MaxTemp_min@0 < 30, required_guarantees=[]
639          └──────────────────────────────────────────────────
640        ");
641    }
642
643    #[tokio::test]
644    async fn test_unioning_3_tables() {
645        let query = r#"
646        set distributed.children_isolator_unions=true;
647        SELECT "MinTemp", "RainToday" FROM weather WHERE "MinTemp" > 10.0
648        UNION ALL
649        SELECT "MaxTemp", "RainToday" FROM weather WHERE "MaxTemp" < 30.0
650        UNION ALL
651        SELECT "Temp9am", "RainToday" FROM weather WHERE "Temp9am" > 15.0
652        "#;
653        let plan = sql_to_explain(query, |b| {
654            b.with_distributed_worker_resolver(InMemoryWorkerResolver::new(3))
655        })
656        .await;
657        assert_snapshot!(plan, @r"
658        ┌───── DistributedExec ── Tasks: t0:[p0] 
659        │ CoalescePartitionsExec
660        │   [Stage 1] => NetworkCoalesceExec: output_partitions=12, input_tasks=3
661        └──────────────────────────────────────────────────
662          ┌───── Stage 1 ── Tasks: t0:[p0..p3] t1:[p4..p7] t2:[p8..p11] 
663          │ DistributedUnionExec: t0:[c0] t1:[c1] t2:[c2]
664          │   FilterExec: MinTemp@0 > 10
665          │     RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=3
666          │       DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, RainToday], file_type=parquet, predicate=MinTemp@0 > 10, pruning_predicate=MinTemp_null_count@1 != row_count@2 AND MinTemp_max@0 > 10, required_guarantees=[]
667          │   ProjectionExec: expr=[MaxTemp@0 as MinTemp, RainToday@1 as RainToday]
668          │     FilterExec: MaxTemp@0 < 30
669          │       RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=3
670          │         DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MaxTemp, RainToday], file_type=parquet, predicate=MaxTemp@1 < 30, pruning_predicate=MaxTemp_null_count@1 != row_count@2 AND MaxTemp_min@0 < 30, required_guarantees=[]
671          │   ProjectionExec: expr=[Temp9am@0 as MinTemp, RainToday@1 as RainToday]
672          │     FilterExec: Temp9am@0 > 15
673          │       RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=3
674          │         DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[Temp9am, RainToday], file_type=parquet, predicate=Temp9am@17 > 15, pruning_predicate=Temp9am_null_count@1 != row_count@2 AND Temp9am_max@0 > 15, required_guarantees=[]
675          └──────────────────────────────────────────────────
676        ");
677    }
678
679    #[tokio::test]
680    async fn test_unioning_5_tables() {
681        let query = r#"
682        set distributed.children_isolator_unions=true;
683        SELECT "MinTemp", "RainToday" FROM weather WHERE "MinTemp" > 10.0
684        UNION ALL
685        SELECT "MaxTemp", "RainToday" FROM weather WHERE "MaxTemp" < 30.0
686        UNION ALL
687        SELECT "Temp9am", "RainToday" FROM weather WHERE "Temp9am" > 15.0
688        UNION ALL
689        SELECT "Temp3pm", "RainToday" FROM weather WHERE "Temp3pm" < 25.0
690        UNION ALL
691        SELECT "Rainfall", "RainToday" FROM weather WHERE "Rainfall" > 5.0
692        "#;
693        let plan = sql_to_explain(query, |b| {
694            b.with_distributed_worker_resolver(InMemoryWorkerResolver::new(3))
695        })
696        .await;
697        assert_snapshot!(plan, @r"
698        ┌───── DistributedExec ── Tasks: t0:[p0] 
699        │ CoalescePartitionsExec
700        │   [Stage 1] => NetworkCoalesceExec: output_partitions=24, input_tasks=3
701        └──────────────────────────────────────────────────
702          ┌───── Stage 1 ── Tasks: t0:[p0..p7] t1:[p8..p15] t2:[p16..p23] 
703          │ DistributedUnionExec: t0:[c0, c1] t1:[c2, c3] t2:[c4]
704          │   FilterExec: MinTemp@0 > 10
705          │     RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=3
706          │       DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, RainToday], file_type=parquet, predicate=MinTemp@0 > 10, pruning_predicate=MinTemp_null_count@1 != row_count@2 AND MinTemp_max@0 > 10, required_guarantees=[]
707          │   ProjectionExec: expr=[MaxTemp@0 as MinTemp, RainToday@1 as RainToday]
708          │     FilterExec: MaxTemp@0 < 30
709          │       RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=3
710          │         DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MaxTemp, RainToday], file_type=parquet, predicate=MaxTemp@1 < 30, pruning_predicate=MaxTemp_null_count@1 != row_count@2 AND MaxTemp_min@0 < 30, required_guarantees=[]
711          │   ProjectionExec: expr=[Temp9am@0 as MinTemp, RainToday@1 as RainToday]
712          │     FilterExec: Temp9am@0 > 15
713          │       RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=3
714          │         DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[Temp9am, RainToday], file_type=parquet, predicate=Temp9am@17 > 15, pruning_predicate=Temp9am_null_count@1 != row_count@2 AND Temp9am_max@0 > 15, required_guarantees=[]
715          │   ProjectionExec: expr=[Temp3pm@0 as MinTemp, RainToday@1 as RainToday]
716          │     FilterExec: Temp3pm@0 < 25
717          │       RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=3
718          │         DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[Temp3pm, RainToday], file_type=parquet, predicate=Temp3pm@18 < 25, pruning_predicate=Temp3pm_null_count@1 != row_count@2 AND Temp3pm_min@0 < 25, required_guarantees=[]
719          │   ProjectionExec: expr=[Rainfall@0 as MinTemp, RainToday@1 as RainToday]
720          │     FilterExec: Rainfall@0 > 5
721          │       RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=3
722          │         DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[Rainfall, RainToday], file_type=parquet, predicate=Rainfall@2 > 5, pruning_predicate=Rainfall_null_count@1 != row_count@2 AND Rainfall_max@0 > 5, required_guarantees=[]
723          └──────────────────────────────────────────────────
724        ");
725    }
726
727    #[tokio::test]
728    async fn test_broadcast_join() {
729        let query = r#"
730        SELECT a."MinTemp", b."MaxTemp"
731        FROM weather a INNER JOIN weather b
732        ON a."RainToday" = b."RainToday"
733        "#;
734        let annotated = sql_to_explain_with_broadcast(query, 3, true).await;
735        assert_snapshot!(annotated, @"
736        ┌───── DistributedExec ── Tasks: t0:[p0] 
737        │ CoalescePartitionsExec
738        │   [Stage 2] => NetworkCoalesceExec: output_partitions=3, input_tasks=3
739        └──────────────────────────────────────────────────
740          ┌───── Stage 2 ── Tasks: t0:[p0] t1:[p1] t2:[p2] 
741          │ HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(RainToday@1, RainToday@1)], projection=[MinTemp@0, MaxTemp@2]
742          │   CoalescePartitionsExec
743          │     [Stage 1] => NetworkBroadcastExec: partitions_per_consumer=1, stage_partitions=3, input_tasks=3
744          │   PartitionIsolatorExec: tasks=3 partitions=3
745          │     DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MaxTemp, RainToday], file_type=parquet, predicate=DynamicFilter [ empty ]
746          └──────────────────────────────────────────────────
747            ┌───── Stage 1 ── Tasks: t0:[p0..p2] t1:[p3..p5] t2:[p6..p8] 
748            │ BroadcastExec: input_partitions=1, consumer_tasks=3, output_partitions=3
749            │   PartitionIsolatorExec: tasks=3 partitions=3
750            │     DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, RainToday], file_type=parquet
751            └──────────────────────────────────────────────────
752        ")
753    }
754
755    #[tokio::test]
756    async fn test_broadcast_nested_joins() {
757        let query = r#"
758        SELECT a."MinTemp", b."MaxTemp", c."Rainfall"
759        FROM weather a
760        INNER JOIN weather b ON a."RainToday" = b."RainToday"
761        INNER JOIN weather c ON b."RainToday" = c."RainToday"
762        "#;
763        let plan = sql_to_explain_with_broadcast(query, 3, true).await;
764        assert_snapshot!(plan, @"
765        ┌───── DistributedExec ── Tasks: t0:[p0] 
766        │ CoalescePartitionsExec
767        │   [Stage 3] => NetworkCoalesceExec: output_partitions=3, input_tasks=3
768        └──────────────────────────────────────────────────
769          ┌───── Stage 3 ── Tasks: t0:[p0] t1:[p1] t2:[p2] 
770          │ HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(RainToday@2, RainToday@1)], projection=[MinTemp@0, MaxTemp@1, Rainfall@3]
771          │   CoalescePartitionsExec
772          │     [Stage 2] => NetworkBroadcastExec: partitions_per_consumer=1, stage_partitions=3, input_tasks=3
773          │   PartitionIsolatorExec: tasks=3 partitions=3
774          │     DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[Rainfall, RainToday], file_type=parquet, predicate=DynamicFilter [ empty ]
775          └──────────────────────────────────────────────────
776            ┌───── Stage 2 ── Tasks: t0:[p0..p2] t1:[p3..p5] t2:[p6..p8] 
777            │ BroadcastExec: input_partitions=1, consumer_tasks=3, output_partitions=3
778            │   HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(RainToday@1, RainToday@1)], projection=[MinTemp@0, MaxTemp@2, RainToday@3]
779            │     CoalescePartitionsExec
780            │       [Stage 1] => NetworkBroadcastExec: partitions_per_consumer=1, stage_partitions=3, input_tasks=3
781            │     PartitionIsolatorExec: tasks=3 partitions=3
782            │       DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MaxTemp, RainToday], file_type=parquet, predicate=DynamicFilter [ empty ]
783            └──────────────────────────────────────────────────
784              ┌───── Stage 1 ── Tasks: t0:[p0..p2] t1:[p3..p5] t2:[p6..p8] 
785              │ BroadcastExec: input_partitions=1, consumer_tasks=3, output_partitions=3
786              │   PartitionIsolatorExec: tasks=3 partitions=3
787              │     DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, RainToday], file_type=parquet
788              └──────────────────────────────────────────────────
789        ")
790    }
791
792    #[tokio::test]
793    async fn test_broadcast_datasource_as_build_child() {
794        let query = r#"
795        SELECT a."MinTemp", b."MaxTemp"
796        FROM weather a INNER JOIN weather b
797        ON a."RainToday" = b."RainToday"
798        "#;
799
800        let physical_plan = sql_to_physical_plan(query, 4, 3).await;
801        assert_snapshot!(physical_plan, @r"
802        HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(RainToday@1, RainToday@1)], projection=[MinTemp@0, MaxTemp@2]
803          CoalescePartitionsExec
804            DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, RainToday], file_type=parquet
805          DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MaxTemp, RainToday], file_type=parquet, predicate=DynamicFilter [ empty ]
806        ");
807
808        let plan = sql_to_explain_with_broadcast(query, 3, true).await;
809        assert_snapshot!(plan, @"
810        ┌───── DistributedExec ── Tasks: t0:[p0] 
811        │ CoalescePartitionsExec
812        │   [Stage 2] => NetworkCoalesceExec: output_partitions=3, input_tasks=3
813        └──────────────────────────────────────────────────
814          ┌───── Stage 2 ── Tasks: t0:[p0] t1:[p1] t2:[p2] 
815          │ HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(RainToday@1, RainToday@1)], projection=[MinTemp@0, MaxTemp@2]
816          │   CoalescePartitionsExec
817          │     [Stage 1] => NetworkBroadcastExec: partitions_per_consumer=1, stage_partitions=3, input_tasks=3
818          │   PartitionIsolatorExec: tasks=3 partitions=3
819          │     DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MaxTemp, RainToday], file_type=parquet, predicate=DynamicFilter [ empty ]
820          └──────────────────────────────────────────────────
821            ┌───── Stage 1 ── Tasks: t0:[p0..p2] t1:[p3..p5] t2:[p6..p8] 
822            │ BroadcastExec: input_partitions=1, consumer_tasks=3, output_partitions=3
823            │   PartitionIsolatorExec: tasks=3 partitions=3
824            │     DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, RainToday], file_type=parquet
825            └──────────────────────────────────────────────────
826        ")
827    }
828
829    #[tokio::test]
830    async fn test_broadcast_union_children_isolator_plan() {
831        let query = r#"
832        SET distributed.children_isolator_unions = true;
833        SELECT a."MinTemp", b."MaxTemp"
834        FROM weather a INNER JOIN weather b
835        ON a."RainToday" = b."RainToday"
836        UNION ALL
837        SELECT a."MinTemp", b."MaxTemp"
838        FROM weather a INNER JOIN weather b
839        ON a."RainToday" = b."RainToday"
840        UNION ALL
841        SELECT a."MinTemp", b."MaxTemp"
842        FROM weather a INNER JOIN weather b
843        ON a."RainToday" = b."RainToday"
844        "#;
845        let plan = sql_to_explain_with_broadcast(query, 3, true).await;
846        assert_snapshot!(plan, @r"
847        ┌───── DistributedExec ── Tasks: t0:[p0] 
848        │ CoalescePartitionsExec
849        │   [Stage 1] => NetworkCoalesceExec: output_partitions=9, input_tasks=3
850        └──────────────────────────────────────────────────
851          ┌───── Stage 1 ── Tasks: t0:[p0..p2] t1:[p3..p5] t2:[p6..p8] 
852          │ DistributedUnionExec: t0:[c0] t1:[c1] t2:[c2]
853          │   HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(RainToday@1, RainToday@1)], projection=[MinTemp@0, MaxTemp@2]
854          │     CoalescePartitionsExec
855          │       BroadcastExec: input_partitions=3, consumer_tasks=1, output_partitions=3
856          │         DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, RainToday], file_type=parquet
857          │     DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MaxTemp, RainToday], file_type=parquet, predicate=DynamicFilter [ empty ]
858          │   HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(RainToday@1, RainToday@1)], projection=[MinTemp@0, MaxTemp@2]
859          │     CoalescePartitionsExec
860          │       BroadcastExec: input_partitions=3, consumer_tasks=1, output_partitions=3
861          │         DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, RainToday], file_type=parquet
862          │     DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MaxTemp, RainToday], file_type=parquet, predicate=DynamicFilter [ empty ]
863          │   HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(RainToday@1, RainToday@1)], projection=[MinTemp@0, MaxTemp@2]
864          │     CoalescePartitionsExec
865          │       BroadcastExec: input_partitions=3, consumer_tasks=1, output_partitions=3
866          │         DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, RainToday], file_type=parquet
867          │     DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MaxTemp, RainToday], file_type=parquet, predicate=DynamicFilter [ empty ]
868          └──────────────────────────────────────────────────
869        ");
870    }
871
872    #[tokio::test]
873    async fn test_broadcast_one_to_many_plan() {
874        let query = r#"
875        SELECT a."MinTemp", b."MaxTemp"
876        FROM weather a INNER JOIN weather b
877        ON a."RainToday" = b."RainToday"
878        "#;
879        let plan = sql_to_explain_with_broadcast_one_to_many(query, 3).await;
880        assert_snapshot!(plan, @"
881        ┌───── DistributedExec ── Tasks: t0:[p0] 
882        │ CoalescePartitionsExec
883        │   [Stage 2] => NetworkCoalesceExec: output_partitions=3, input_tasks=3
884        └──────────────────────────────────────────────────
885          ┌───── Stage 2 ── Tasks: t0:[p0] t1:[p1] t2:[p2] 
886          │ HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(RainToday@1, RainToday@1)], projection=[MinTemp@0, MaxTemp@2]
887          │   CoalescePartitionsExec
888          │     [Stage 1] => NetworkBroadcastExec: partitions_per_consumer=3, stage_partitions=9, input_tasks=1
889          │   PartitionIsolatorExec: tasks=3 partitions=3
890          │     DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MaxTemp, RainToday], file_type=parquet, predicate=DynamicFilter [ empty ]
891          └──────────────────────────────────────────────────
892            ┌───── Stage 1 ── Tasks: t0:[p0..p8] 
893            │ BroadcastExec: input_partitions=3, consumer_tasks=3, output_partitions=9
894            │   DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, RainToday], file_type=parquet
895            └──────────────────────────────────────────────────
896        ");
897    }
898
899    async fn sql_to_explain(
900        query: &str,
901        f: impl FnOnce(SessionStateBuilder) -> SessionStateBuilder,
902    ) -> String {
903        explain_test_plan(query, TestPlanOptions::default(), true, f).await
904    }
905
906    async fn sql_to_explain_with_broadcast(
907        query: &str,
908        num_workers: usize,
909        broadcast_enabled: bool,
910    ) -> String {
911        sql_to_plan_with_options(query, num_workers, broadcast_enabled, true).await
912    }
913
914    async fn sql_to_explain_with_broadcast_one_to_many(query: &str, num_workers: usize) -> String {
915        let options = TestPlanOptions {
916            target_partitions: 4,
917            num_workers,
918            broadcast_enabled: true,
919        };
920        explain_test_plan(query, options, true, |b| {
921            b.with_distributed_task_estimator(BuildSideOneTaskEstimator)
922        })
923        .await
924    }
925
926    async fn sql_to_plan_with_options(
927        query: &str,
928        num_workers: usize,
929        broadcast_enabled: bool,
930        use_optimizer: bool,
931    ) -> String {
932        let options = TestPlanOptions {
933            target_partitions: 4,
934            num_workers,
935            broadcast_enabled,
936        };
937        explain_test_plan(query, options, use_optimizer, |b| b).await
938    }
939
940    async fn explain_test_plan(
941        query: &str,
942        options: TestPlanOptions,
943        use_optimizer: bool,
944        configure: impl FnOnce(SessionStateBuilder) -> SessionStateBuilder,
945    ) -> String {
946        let mut builder = base_session_builder(
947            options.target_partitions,
948            options.num_workers,
949            options.broadcast_enabled,
950        );
951        if use_optimizer {
952            builder =
953                builder.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule));
954        }
955        let builder = configure(builder);
956        let (ctx, query) = context_with_query(builder, query).await;
957        let df = ctx.sql(&query).await.unwrap();
958        let physical_plan = df.create_physical_plan().await.unwrap();
959
960        if use_optimizer {
961            display_plan_ascii(physical_plan.as_ref(), false)
962        } else {
963            format!("{}", displayable(physical_plan.as_ref()).indent(true))
964        }
965    }
966}