1use crate::common::require_one_child;
2use crate::distributed_planner::batch_coalescing_below_network_boundaries;
3use crate::distributed_planner::plan_annotator::{
4 AnnotatedPlan, PlanOrNetworkBoundary, annotate_plan,
5};
6use crate::{
7 DistributedConfig, DistributedExec, NetworkBroadcastExec, NetworkCoalesceExec,
8 NetworkShuffleExec, TaskEstimator,
9};
10use datafusion::config::ConfigOptions;
11use datafusion::error::DataFusionError;
12use datafusion::physical_optimizer::PhysicalOptimizerRule;
13use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
14use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties};
15use std::fmt::Debug;
16use std::ops::AddAssign;
17use std::sync::Arc;
18use uuid::Uuid;
19
20use super::insert_broadcast::insert_broadcast_execs;
21
22#[derive(Debug, Default)]
40pub struct DistributedPhysicalOptimizerRule;
41
42impl PhysicalOptimizerRule for DistributedPhysicalOptimizerRule {
43 fn optimize(
44 &self,
45 original: Arc<dyn ExecutionPlan>,
46 cfg: &ConfigOptions,
47 ) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
48 if original.as_any().is::<DistributedExec>() {
49 return Ok(original);
50 }
51
52 let mut plan = Arc::clone(&original);
53 if original.output_partitioning().partition_count() > 1 {
54 plan = Arc::new(CoalescePartitionsExec::new(plan))
55 }
56
57 plan = insert_broadcast_execs(plan, cfg)?;
58
59 let annotated = annotate_plan(plan, cfg)?;
60
61 let mut stage_id = 1;
62 let distributed = distribute_plan(annotated, cfg, Uuid::new_v4(), &mut stage_id)?;
63 if stage_id == 1 {
64 return Ok(original);
65 }
66 let distributed = batch_coalescing_below_network_boundaries(distributed, cfg)?;
67
68 Ok(Arc::new(DistributedExec::new(distributed)))
69 }
70
71 fn name(&self) -> &str {
72 "DistributedPhysicalOptimizer"
73 }
74
75 fn schema_check(&self) -> bool {
76 true
77 }
78}
79
80fn distribute_plan(
88 annotated_plan: AnnotatedPlan,
89 cfg: &ConfigOptions,
90 query_id: Uuid,
91 stage_id: &mut usize,
92) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
93 let d_cfg = DistributedConfig::from_config_options(cfg)?;
94 let children = annotated_plan.children;
95 let task_count = annotated_plan.task_count.as_usize();
96 let max_child_task_count = children.iter().map(|v| v.task_count.as_usize()).max();
97 let new_children = children
98 .into_iter()
99 .map(|child| distribute_plan(child, cfg, query_id, stage_id))
100 .collect::<Result<Vec<_>, _>>()?;
101 match annotated_plan.plan_or_nb {
102 PlanOrNetworkBoundary::Plan(plan) if plan.children().is_empty() => {
105 let scaled_up = d_cfg.__private_task_estimator.scale_up_leaf_node(
106 &plan,
107 annotated_plan.task_count.as_usize(),
108 cfg,
109 );
110 Ok(scaled_up.unwrap_or(plan))
111 }
112 PlanOrNetworkBoundary::Plan(plan) => plan.with_new_children(new_children),
114 PlanOrNetworkBoundary::Shuffle => {
116 if task_count == 1 && max_child_task_count == Some(1) {
119 return require_one_child(new_children);
120 }
121 let node = Arc::new(NetworkShuffleExec::try_new(
122 require_one_child(new_children)?,
123 query_id,
124 *stage_id,
125 task_count,
126 max_child_task_count.unwrap_or(1),
127 )?);
128 stage_id.add_assign(1);
129 Ok(node)
130 }
131 PlanOrNetworkBoundary::Coalesce => {
134 if task_count == 1 && max_child_task_count == Some(1) {
137 return require_one_child(new_children);
138 }
139 let node = Arc::new(NetworkCoalesceExec::try_new(
140 require_one_child(new_children)?,
141 query_id,
142 *stage_id,
143 task_count,
144 max_child_task_count.unwrap_or(1),
145 )?);
146 stage_id.add_assign(1);
147 Ok(node)
148 }
149 PlanOrNetworkBoundary::Broadcast => {
152 if task_count == 1 && max_child_task_count == Some(1) {
155 return require_one_child(new_children);
156 }
157 let node = Arc::new(NetworkBroadcastExec::try_new(
158 require_one_child(new_children)?,
159 query_id,
160 *stage_id,
161 task_count,
162 max_child_task_count.unwrap_or(1),
163 )?);
164 stage_id.add_assign(1);
165 Ok(node)
166 }
167 }
168}
169
170#[cfg(test)]
171mod tests {
172 use crate::test_utils::in_memory_channel_resolver::InMemoryWorkerResolver;
173 use crate::test_utils::plans::{
174 BuildSideOneTaskEstimator, TestPlanOptions, base_session_builder, context_with_query,
175 sql_to_physical_plan,
176 };
177 use crate::{
178 DistributedExt, DistributedPhysicalOptimizerRule, assert_snapshot, display_plan_ascii,
179 };
180 use datafusion::execution::SessionStateBuilder;
181 use datafusion::physical_plan::displayable;
182 use std::sync::Arc;
183 #[tokio::test]
210 async fn test_select_all() {
211 let query = r#"
212 SELECT * FROM weather
213 "#;
214 let plan = sql_to_explain(query, |b| {
215 b.with_distributed_worker_resolver(InMemoryWorkerResolver::new(3))
216 })
217 .await;
218 assert_snapshot!(plan, @"DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, MaxTemp, Rainfall, Evaporation, Sunshine, WindGustDir, WindGustSpeed, WindDir9am, WindDir3pm, WindSpeed9am, WindSpeed3pm, Humidity9am, Humidity3pm, Pressure9am, Pressure3pm, Cloud9am, Cloud3pm, Temp9am, Temp3pm, RainToday, RISK_MM, RainTomorrow], file_type=parquet");
219 }
220
221 #[tokio::test]
222 async fn test_aggregation() {
223 let query = r#"
224 SELECT count(*), "RainToday" FROM weather GROUP BY "RainToday" ORDER BY count(*)
225 "#;
226 let plan = sql_to_explain(query, |b| {
227 b.with_distributed_worker_resolver(InMemoryWorkerResolver::new(3))
228 })
229 .await;
230 assert_snapshot!(plan, @"
231 ┌───── DistributedExec ── Tasks: t0:[p0]
232 │ ProjectionExec: expr=[count(*)@0 as count(*), RainToday@1 as RainToday]
233 │ SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST]
234 │ [Stage 2] => NetworkCoalesceExec: output_partitions=8, input_tasks=2
235 └──────────────────────────────────────────────────
236 ┌───── Stage 2 ── Tasks: t0:[p0..p3] t1:[p0..p3]
237 │ SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true]
238 │ ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))]
239 │ AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
240 │ [Stage 1] => NetworkShuffleExec: output_partitions=4, input_tasks=3
241 └──────────────────────────────────────────────────
242 ┌───── Stage 1 ── Tasks: t0:[p0..p7] t1:[p0..p7] t2:[p0..p7]
243 │ RepartitionExec: partitioning=Hash([RainToday@0], 8), input_partitions=1
244 │ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
245 │ PartitionIsolatorExec: tasks=3 partitions=3
246 │ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet
247 └──────────────────────────────────────────────────
248 ");
249 }
250
251 #[tokio::test]
252 async fn test_aggregation_with_fewer_workers_than_files() {
253 let query = r#"
254 SELECT count(*), "RainToday" FROM weather GROUP BY "RainToday" ORDER BY count(*)
255 "#;
256 let plan = sql_to_explain(query, |b| {
257 b.with_distributed_worker_resolver(InMemoryWorkerResolver::new(2))
258 })
259 .await;
260 assert_snapshot!(plan, @"
261 ┌───── DistributedExec ── Tasks: t0:[p0]
262 │ ProjectionExec: expr=[count(*)@0 as count(*), RainToday@1 as RainToday]
263 │ SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST]
264 │ [Stage 2] => NetworkCoalesceExec: output_partitions=8, input_tasks=2
265 └──────────────────────────────────────────────────
266 ┌───── Stage 2 ── Tasks: t0:[p0..p3] t1:[p0..p3]
267 │ SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true]
268 │ ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))]
269 │ AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
270 │ [Stage 1] => NetworkShuffleExec: output_partitions=4, input_tasks=2
271 └──────────────────────────────────────────────────
272 ┌───── Stage 1 ── Tasks: t0:[p0..p7] t1:[p0..p7]
273 │ RepartitionExec: partitioning=Hash([RainToday@0], 8), input_partitions=2
274 │ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
275 │ PartitionIsolatorExec: tasks=2 partitions=3
276 │ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet
277 └──────────────────────────────────────────────────
278 ");
279 }
280
281 #[tokio::test]
282 async fn test_aggregation_with_0_workers() {
283 let query = r#"
284 SELECT count(*), "RainToday" FROM weather GROUP BY "RainToday" ORDER BY count(*)
285 "#;
286 let plan = sql_to_explain(query, |b| {
287 b.with_distributed_worker_resolver(InMemoryWorkerResolver::new(0))
288 })
289 .await;
290 assert_snapshot!(plan, @r"
291 ProjectionExec: expr=[count(*)@0 as count(*), RainToday@1 as RainToday]
292 SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST]
293 SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true]
294 ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))]
295 AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
296 RepartitionExec: partitioning=Hash([RainToday@0], 4), input_partitions=3
297 AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
298 DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet
299 ");
300 }
301
302 #[tokio::test]
303 async fn test_aggregation_with_high_cardinality_factor() {
304 let query = r#"
305 SELECT count(*), "RainToday" FROM weather GROUP BY "RainToday" ORDER BY count(*)
306 "#;
307 let plan = sql_to_explain(query, |b| {
308 b.with_distributed_worker_resolver(InMemoryWorkerResolver::new(3))
309 .with_distributed_cardinality_effect_task_scale_factor(3.0)
310 .unwrap()
311 })
312 .await;
313 assert_snapshot!(plan, @"
314 ┌───── DistributedExec ── Tasks: t0:[p0]
315 │ ProjectionExec: expr=[count(*)@0 as count(*), RainToday@1 as RainToday]
316 │ SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST]
317 │ SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true]
318 │ ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))]
319 │ AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
320 │ [Stage 1] => NetworkShuffleExec: output_partitions=4, input_tasks=3
321 └──────────────────────────────────────────────────
322 ┌───── Stage 1 ── Tasks: t0:[p0..p3] t1:[p0..p3] t2:[p0..p3]
323 │ RepartitionExec: partitioning=Hash([RainToday@0], 4), input_partitions=1
324 │ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
325 │ PartitionIsolatorExec: tasks=3 partitions=3
326 │ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet
327 └──────────────────────────────────────────────────
328 ");
329 }
330
331 #[tokio::test]
332 async fn test_aggregation_with_a_lot_of_files_per_task() {
333 let query = r#"
334 SELECT count(*), "RainToday" FROM weather GROUP BY "RainToday" ORDER BY count(*)
335 "#;
336 let plan = sql_to_explain(query, |b| {
337 b.with_distributed_worker_resolver(InMemoryWorkerResolver::new(3))
338 .with_distributed_files_per_task(3)
339 .unwrap()
340 })
341 .await;
342 assert_snapshot!(plan, @r"
343 ProjectionExec: expr=[count(*)@0 as count(*), RainToday@1 as RainToday]
344 SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST]
345 SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true]
346 ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))]
347 AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
348 RepartitionExec: partitioning=Hash([RainToday@0], 4), input_partitions=3
349 AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
350 DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet
351 ");
352 }
353
354 #[tokio::test]
355 async fn test_aggregation_with_partitions_per_task() {
356 let query = r#"
357 SELECT count(*), "RainToday" FROM weather GROUP BY "RainToday" ORDER BY count(*)
358 "#;
359 let plan = sql_to_explain(query, |b| {
360 b.with_distributed_worker_resolver(InMemoryWorkerResolver::new(3))
361 })
362 .await;
363 assert_snapshot!(plan, @"
364 ┌───── DistributedExec ── Tasks: t0:[p0]
365 │ ProjectionExec: expr=[count(*)@0 as count(*), RainToday@1 as RainToday]
366 │ SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST]
367 │ [Stage 2] => NetworkCoalesceExec: output_partitions=8, input_tasks=2
368 └──────────────────────────────────────────────────
369 ┌───── Stage 2 ── Tasks: t0:[p0..p3] t1:[p0..p3]
370 │ SortExec: expr=[count(*)@0 ASC NULLS LAST], preserve_partitioning=[true]
371 │ ProjectionExec: expr=[count(Int64(1))@1 as count(*), RainToday@0 as RainToday, count(Int64(1))@1 as count(Int64(1))]
372 │ AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
373 │ [Stage 1] => NetworkShuffleExec: output_partitions=4, input_tasks=3
374 └──────────────────────────────────────────────────
375 ┌───── Stage 1 ── Tasks: t0:[p0..p7] t1:[p0..p7] t2:[p0..p7]
376 │ RepartitionExec: partitioning=Hash([RainToday@0], 8), input_partitions=1
377 │ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
378 │ PartitionIsolatorExec: tasks=3 partitions=3
379 │ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet
380 └──────────────────────────────────────────────────
381 ");
382 }
383
384 #[tokio::test]
385 async fn test_left_join() {
386 let query = r#"
387 SELECT a."MinTemp", b."MaxTemp" FROM weather a LEFT JOIN weather b ON a."RainToday" = b."RainToday"
388 "#;
389 let plan = sql_to_explain(query, |b| {
390 b.with_distributed_worker_resolver(InMemoryWorkerResolver::new(3))
391 })
392 .await;
393 assert_snapshot!(plan, @r"
394 HashJoinExec: mode=CollectLeft, join_type=Left, on=[(RainToday@1, RainToday@1)], projection=[MinTemp@0, MaxTemp@2]
395 CoalescePartitionsExec
396 DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, RainToday], file_type=parquet
397 DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MaxTemp, RainToday], file_type=parquet
398 ");
399 }
400
401 #[tokio::test]
402 async fn test_left_join_distributed() {
403 let query = r#"
404 WITH a AS (
405 SELECT
406 AVG("MinTemp") as "MinTemp",
407 "RainTomorrow"
408 FROM weather
409 WHERE "RainToday" = 'yes'
410 GROUP BY "RainTomorrow"
411 ), b AS (
412 SELECT
413 AVG("MaxTemp") as "MaxTemp",
414 "RainTomorrow"
415 FROM weather
416 WHERE "RainToday" = 'no'
417 GROUP BY "RainTomorrow"
418 )
419 SELECT
420 a."MinTemp",
421 b."MaxTemp"
422 FROM a
423 LEFT JOIN b
424 ON a."RainTomorrow" = b."RainTomorrow"
425 "#;
426 let plan = sql_to_explain(query, |b| {
427 b.with_distributed_worker_resolver(InMemoryWorkerResolver::new(3))
428 })
429 .await;
430 assert_snapshot!(plan, @"
431 ┌───── DistributedExec ── Tasks: t0:[p0]
432 │ CoalescePartitionsExec
433 │ HashJoinExec: mode=CollectLeft, join_type=Left, on=[(RainTomorrow@1, RainTomorrow@1)], projection=[MinTemp@0, MaxTemp@2]
434 │ CoalescePartitionsExec
435 │ [Stage 2] => NetworkCoalesceExec: output_partitions=8, input_tasks=2
436 │ ProjectionExec: expr=[avg(weather.MaxTemp)@1 as MaxTemp, RainTomorrow@0 as RainTomorrow]
437 │ AggregateExec: mode=FinalPartitioned, gby=[RainTomorrow@0 as RainTomorrow], aggr=[avg(weather.MaxTemp)]
438 │ [Stage 3] => NetworkShuffleExec: output_partitions=4, input_tasks=3
439 └──────────────────────────────────────────────────
440 ┌───── Stage 2 ── Tasks: t0:[p0..p3] t1:[p0..p3]
441 │ ProjectionExec: expr=[avg(weather.MinTemp)@1 as MinTemp, RainTomorrow@0 as RainTomorrow]
442 │ AggregateExec: mode=FinalPartitioned, gby=[RainTomorrow@0 as RainTomorrow], aggr=[avg(weather.MinTemp)]
443 │ [Stage 1] => NetworkShuffleExec: output_partitions=4, input_tasks=3
444 └──────────────────────────────────────────────────
445 ┌───── Stage 1 ── Tasks: t0:[p0..p7] t1:[p0..p7] t2:[p0..p7]
446 │ RepartitionExec: partitioning=Hash([RainTomorrow@0], 8), input_partitions=4
447 │ AggregateExec: mode=Partial, gby=[RainTomorrow@1 as RainTomorrow], aggr=[avg(weather.MinTemp)]
448 │ FilterExec: RainToday@1 = yes, projection=[MinTemp@0, RainTomorrow@2]
449 │ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
450 │ PartitionIsolatorExec: tasks=3 partitions=3
451 │ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, RainToday, RainTomorrow], file_type=parquet, predicate=RainToday@19 = yes, pruning_predicate=RainToday_null_count@2 != row_count@3 AND RainToday_min@0 <= yes AND yes <= RainToday_max@1, required_guarantees=[RainToday in (yes)]
452 └──────────────────────────────────────────────────
453 ┌───── Stage 3 ── Tasks: t0:[p0..p3] t1:[p0..p3] t2:[p0..p3]
454 │ RepartitionExec: partitioning=Hash([RainTomorrow@0], 4), input_partitions=4
455 │ AggregateExec: mode=Partial, gby=[RainTomorrow@1 as RainTomorrow], aggr=[avg(weather.MaxTemp)]
456 │ FilterExec: RainToday@1 = no, projection=[MaxTemp@0, RainTomorrow@2]
457 │ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
458 │ PartitionIsolatorExec: tasks=3 partitions=3
459 │ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MaxTemp, RainToday, RainTomorrow], file_type=parquet, predicate=RainToday@19 = no, pruning_predicate=RainToday_null_count@2 != row_count@3 AND RainToday_min@0 <= no AND no <= RainToday_max@1, required_guarantees=[RainToday in (no)]
460 └──────────────────────────────────────────────────
461 ");
462 }
463
464 #[tokio::test]
465 async fn test_sort() {
466 let query = r#"
467 SELECT * FROM weather ORDER BY "MinTemp" DESC
468 "#;
469 let plan = sql_to_explain(query, |b| {
470 b.with_distributed_worker_resolver(InMemoryWorkerResolver::new(3))
471 })
472 .await;
473 assert_snapshot!(plan, @"
474 ┌───── DistributedExec ── Tasks: t0:[p0]
475 │ SortPreservingMergeExec: [MinTemp@0 DESC]
476 │ [Stage 1] => NetworkCoalesceExec: output_partitions=3, input_tasks=3
477 └──────────────────────────────────────────────────
478 ┌───── Stage 1 ── Tasks: t0:[p0] t1:[p1] t2:[p2]
479 │ SortExec: expr=[MinTemp@0 DESC], preserve_partitioning=[true]
480 │ PartitionIsolatorExec: tasks=3 partitions=3
481 │ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, MaxTemp, Rainfall, Evaporation, Sunshine, WindGustDir, WindGustSpeed, WindDir9am, WindDir3pm, WindSpeed9am, WindSpeed3pm, Humidity9am, Humidity3pm, Pressure9am, Pressure3pm, Cloud9am, Cloud3pm, Temp9am, Temp3pm, RainToday, RISK_MM, RainTomorrow], file_type=parquet
482 └──────────────────────────────────────────────────
483 ");
484 }
485
486 #[tokio::test]
487 async fn test_distinct() {
488 let query = r#"
489 SELECT DISTINCT "RainToday", "WindGustDir" FROM weather
490 "#;
491 let plan = sql_to_explain(query, |b| {
492 b.with_distributed_worker_resolver(InMemoryWorkerResolver::new(3))
493 })
494 .await;
495 assert_snapshot!(plan, @"
496 ┌───── DistributedExec ── Tasks: t0:[p0]
497 │ CoalescePartitionsExec
498 │ [Stage 2] => NetworkCoalesceExec: output_partitions=8, input_tasks=2
499 └──────────────────────────────────────────────────
500 ┌───── Stage 2 ── Tasks: t0:[p0..p3] t1:[p0..p3]
501 │ AggregateExec: mode=FinalPartitioned, gby=[RainToday@0 as RainToday, WindGustDir@1 as WindGustDir], aggr=[]
502 │ [Stage 1] => NetworkShuffleExec: output_partitions=4, input_tasks=3
503 └──────────────────────────────────────────────────
504 ┌───── Stage 1 ── Tasks: t0:[p0..p7] t1:[p0..p7] t2:[p0..p7]
505 │ RepartitionExec: partitioning=Hash([RainToday@0, WindGustDir@1], 8), input_partitions=1
506 │ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday, WindGustDir@1 as WindGustDir], aggr=[]
507 │ PartitionIsolatorExec: tasks=3 partitions=3
508 │ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday, WindGustDir], file_type=parquet
509 └──────────────────────────────────────────────────
510 ");
511 }
512
513 #[tokio::test]
514 async fn test_show_columns() {
515 let query = r#"
516 SHOW COLUMNS from weather
517 "#;
518 let plan = sql_to_explain(query, |b| {
519 b.with_distributed_worker_resolver(InMemoryWorkerResolver::new(3))
520 })
521 .await;
522 assert_snapshot!(plan, @r"
523 ProjectionExec: expr=[table_catalog@0 as table_catalog, table_schema@1 as table_schema, table_name@2 as table_name, column_name@3 as column_name, data_type@5 as data_type, is_nullable@4 as is_nullable]
524 FilterExec: table_name@2 = weather
525 RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
526 StreamingTableExec: partition_sizes=1, projection=[table_catalog, table_schema, table_name, column_name, is_nullable, data_type]
527 ");
528 }
529
530 #[tokio::test]
531 async fn test_limited_by_worker() {
532 let query = r#"
533 SET datafusion.execution.target_partitions=2;
534 SELECT 1 FROM weather
535 UNION ALL
536 SELECT 1 FROM flights_1m
537 "#;
538 let plan = sql_to_explain(query, |b| {
539 b.with_distributed_worker_resolver(InMemoryWorkerResolver::new(2))
540 })
541 .await;
542 assert_snapshot!(plan, @r"
543 ┌───── DistributedExec ── Tasks: t0:[p0]
544 │ CoalescePartitionsExec
545 │ [Stage 1] => NetworkCoalesceExec: output_partitions=4, input_tasks=2
546 └──────────────────────────────────────────────────
547 ┌───── Stage 1 ── Tasks: t0:[p0..p1] t1:[p2..p3]
548 │ DistributedUnionExec: t0:[c0] t1:[c1]
549 │ DataSourceExec: file_groups={2 groups: [[/testdata/weather/result-000000.parquet, /testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[1 as Int64(1)], file_type=parquet
550 │ DataSourceExec: file_groups={1 group: [[/testdata/flights-1m.parquet]]}, projection=[1 as Int64(1)], file_type=parquet
551 └──────────────────────────────────────────────────
552 ");
553 }
554
555 #[tokio::test]
556 async fn test_limited_by_config() {
557 let query = r#"
558 SET distributed.max_tasks_per_stage=2;
559 SELECT 1 FROM weather
560 UNION ALL
561 SELECT 1 FROM flights_1m
562 "#;
563 let plan = sql_to_explain(query, |b| {
564 b.with_distributed_worker_resolver(InMemoryWorkerResolver::new(3))
565 })
566 .await;
567 assert_snapshot!(plan, @r"
568 ┌───── DistributedExec ── Tasks: t0:[p0]
569 │ CoalescePartitionsExec
570 │ [Stage 1] => NetworkCoalesceExec: output_partitions=6, input_tasks=2
571 └──────────────────────────────────────────────────
572 ┌───── Stage 1 ── Tasks: t0:[p0..p2] t1:[p3..p5]
573 │ DistributedUnionExec: t0:[c0] t1:[c1]
574 │ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[1 as Int64(1)], file_type=parquet
575 │ DataSourceExec: file_groups={1 group: [[/testdata/flights-1m.parquet]]}, projection=[1 as Int64(1)], file_type=parquet
576 └──────────────────────────────────────────────────
577 ");
578 }
579
580 #[tokio::test]
581 async fn test_unioning_2_tables() {
582 let query = r#"
583 set distributed.children_isolator_unions=true;
584 SELECT "MinTemp", "RainToday" FROM weather WHERE "MinTemp" > 10.0
585 UNION ALL
586 SELECT "MaxTemp", "RainToday" FROM weather WHERE "MaxTemp" < 30.0
587 "#;
588 let plan = sql_to_explain(query, |b| {
589 b.with_distributed_worker_resolver(InMemoryWorkerResolver::new(6))
590 })
591 .await;
592 assert_snapshot!(plan, @"
593 ┌───── DistributedExec ── Tasks: t0:[p0]
594 │ CoalescePartitionsExec
595 │ [Stage 1] => NetworkCoalesceExec: output_partitions=24, input_tasks=6
596 └──────────────────────────────────────────────────
597 ┌───── Stage 1 ── Tasks: t0:[p0..p3] t1:[p4..p7] t2:[p8..p11] t3:[p12..p15] t4:[p16..p19] t5:[p20..p23]
598 │ DistributedUnionExec: t0:[c0(0/3)] t1:[c0(1/3)] t2:[c0(2/3)] t3:[c1(0/3)] t4:[c1(1/3)] t5:[c1(2/3)]
599 │ FilterExec: MinTemp@0 > 10
600 │ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
601 │ PartitionIsolatorExec: tasks=3 partitions=3
602 │ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, RainToday], file_type=parquet, predicate=MinTemp@0 > 10, pruning_predicate=MinTemp_null_count@1 != row_count@2 AND MinTemp_max@0 > 10, required_guarantees=[]
603 │ ProjectionExec: expr=[MaxTemp@0 as MinTemp, RainToday@1 as RainToday]
604 │ FilterExec: MaxTemp@0 < 30
605 │ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
606 │ PartitionIsolatorExec: tasks=3 partitions=3
607 │ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MaxTemp, RainToday], file_type=parquet, predicate=MaxTemp@1 < 30, pruning_predicate=MaxTemp_null_count@1 != row_count@2 AND MaxTemp_min@0 < 30, required_guarantees=[]
608 └──────────────────────────────────────────────────
609 ");
610 }
611
612 #[tokio::test]
613 async fn test_unioning_2_tables_limited_workers() {
614 let query = r#"
615 set distributed.children_isolator_unions=true;
616 SELECT "MinTemp", "RainToday" FROM weather WHERE "MinTemp" > 10.0
617 UNION ALL
618 SELECT "MaxTemp", "RainToday" FROM weather WHERE "MaxTemp" < 30.0
619 "#;
620 let plan = sql_to_explain(query, |b| {
621 b.with_distributed_worker_resolver(InMemoryWorkerResolver::new(3))
622 })
623 .await;
624 assert_snapshot!(plan, @"
625 ┌───── DistributedExec ── Tasks: t0:[p0]
626 │ CoalescePartitionsExec
627 │ [Stage 1] => NetworkCoalesceExec: output_partitions=12, input_tasks=3
628 └──────────────────────────────────────────────────
629 ┌───── Stage 1 ── Tasks: t0:[p0..p3] t1:[p4..p7] t2:[p8..p11]
630 │ DistributedUnionExec: t0:[c0] t1:[c1(0/2)] t2:[c1(1/2)]
631 │ FilterExec: MinTemp@0 > 10
632 │ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=3
633 │ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, RainToday], file_type=parquet, predicate=MinTemp@0 > 10, pruning_predicate=MinTemp_null_count@1 != row_count@2 AND MinTemp_max@0 > 10, required_guarantees=[]
634 │ ProjectionExec: expr=[MaxTemp@0 as MinTemp, RainToday@1 as RainToday]
635 │ FilterExec: MaxTemp@0 < 30
636 │ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
637 │ PartitionIsolatorExec: tasks=2 partitions=3
638 │ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MaxTemp, RainToday], file_type=parquet, predicate=MaxTemp@1 < 30, pruning_predicate=MaxTemp_null_count@1 != row_count@2 AND MaxTemp_min@0 < 30, required_guarantees=[]
639 └──────────────────────────────────────────────────
640 ");
641 }
642
643 #[tokio::test]
644 async fn test_unioning_3_tables() {
645 let query = r#"
646 set distributed.children_isolator_unions=true;
647 SELECT "MinTemp", "RainToday" FROM weather WHERE "MinTemp" > 10.0
648 UNION ALL
649 SELECT "MaxTemp", "RainToday" FROM weather WHERE "MaxTemp" < 30.0
650 UNION ALL
651 SELECT "Temp9am", "RainToday" FROM weather WHERE "Temp9am" > 15.0
652 "#;
653 let plan = sql_to_explain(query, |b| {
654 b.with_distributed_worker_resolver(InMemoryWorkerResolver::new(3))
655 })
656 .await;
657 assert_snapshot!(plan, @r"
658 ┌───── DistributedExec ── Tasks: t0:[p0]
659 │ CoalescePartitionsExec
660 │ [Stage 1] => NetworkCoalesceExec: output_partitions=12, input_tasks=3
661 └──────────────────────────────────────────────────
662 ┌───── Stage 1 ── Tasks: t0:[p0..p3] t1:[p4..p7] t2:[p8..p11]
663 │ DistributedUnionExec: t0:[c0] t1:[c1] t2:[c2]
664 │ FilterExec: MinTemp@0 > 10
665 │ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=3
666 │ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, RainToday], file_type=parquet, predicate=MinTemp@0 > 10, pruning_predicate=MinTemp_null_count@1 != row_count@2 AND MinTemp_max@0 > 10, required_guarantees=[]
667 │ ProjectionExec: expr=[MaxTemp@0 as MinTemp, RainToday@1 as RainToday]
668 │ FilterExec: MaxTemp@0 < 30
669 │ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=3
670 │ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MaxTemp, RainToday], file_type=parquet, predicate=MaxTemp@1 < 30, pruning_predicate=MaxTemp_null_count@1 != row_count@2 AND MaxTemp_min@0 < 30, required_guarantees=[]
671 │ ProjectionExec: expr=[Temp9am@0 as MinTemp, RainToday@1 as RainToday]
672 │ FilterExec: Temp9am@0 > 15
673 │ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=3
674 │ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[Temp9am, RainToday], file_type=parquet, predicate=Temp9am@17 > 15, pruning_predicate=Temp9am_null_count@1 != row_count@2 AND Temp9am_max@0 > 15, required_guarantees=[]
675 └──────────────────────────────────────────────────
676 ");
677 }
678
679 #[tokio::test]
680 async fn test_unioning_5_tables() {
681 let query = r#"
682 set distributed.children_isolator_unions=true;
683 SELECT "MinTemp", "RainToday" FROM weather WHERE "MinTemp" > 10.0
684 UNION ALL
685 SELECT "MaxTemp", "RainToday" FROM weather WHERE "MaxTemp" < 30.0
686 UNION ALL
687 SELECT "Temp9am", "RainToday" FROM weather WHERE "Temp9am" > 15.0
688 UNION ALL
689 SELECT "Temp3pm", "RainToday" FROM weather WHERE "Temp3pm" < 25.0
690 UNION ALL
691 SELECT "Rainfall", "RainToday" FROM weather WHERE "Rainfall" > 5.0
692 "#;
693 let plan = sql_to_explain(query, |b| {
694 b.with_distributed_worker_resolver(InMemoryWorkerResolver::new(3))
695 })
696 .await;
697 assert_snapshot!(plan, @r"
698 ┌───── DistributedExec ── Tasks: t0:[p0]
699 │ CoalescePartitionsExec
700 │ [Stage 1] => NetworkCoalesceExec: output_partitions=24, input_tasks=3
701 └──────────────────────────────────────────────────
702 ┌───── Stage 1 ── Tasks: t0:[p0..p7] t1:[p8..p15] t2:[p16..p23]
703 │ DistributedUnionExec: t0:[c0, c1] t1:[c2, c3] t2:[c4]
704 │ FilterExec: MinTemp@0 > 10
705 │ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=3
706 │ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, RainToday], file_type=parquet, predicate=MinTemp@0 > 10, pruning_predicate=MinTemp_null_count@1 != row_count@2 AND MinTemp_max@0 > 10, required_guarantees=[]
707 │ ProjectionExec: expr=[MaxTemp@0 as MinTemp, RainToday@1 as RainToday]
708 │ FilterExec: MaxTemp@0 < 30
709 │ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=3
710 │ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MaxTemp, RainToday], file_type=parquet, predicate=MaxTemp@1 < 30, pruning_predicate=MaxTemp_null_count@1 != row_count@2 AND MaxTemp_min@0 < 30, required_guarantees=[]
711 │ ProjectionExec: expr=[Temp9am@0 as MinTemp, RainToday@1 as RainToday]
712 │ FilterExec: Temp9am@0 > 15
713 │ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=3
714 │ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[Temp9am, RainToday], file_type=parquet, predicate=Temp9am@17 > 15, pruning_predicate=Temp9am_null_count@1 != row_count@2 AND Temp9am_max@0 > 15, required_guarantees=[]
715 │ ProjectionExec: expr=[Temp3pm@0 as MinTemp, RainToday@1 as RainToday]
716 │ FilterExec: Temp3pm@0 < 25
717 │ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=3
718 │ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[Temp3pm, RainToday], file_type=parquet, predicate=Temp3pm@18 < 25, pruning_predicate=Temp3pm_null_count@1 != row_count@2 AND Temp3pm_min@0 < 25, required_guarantees=[]
719 │ ProjectionExec: expr=[Rainfall@0 as MinTemp, RainToday@1 as RainToday]
720 │ FilterExec: Rainfall@0 > 5
721 │ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=3
722 │ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[Rainfall, RainToday], file_type=parquet, predicate=Rainfall@2 > 5, pruning_predicate=Rainfall_null_count@1 != row_count@2 AND Rainfall_max@0 > 5, required_guarantees=[]
723 └──────────────────────────────────────────────────
724 ");
725 }
726
727 #[tokio::test]
728 async fn test_broadcast_join() {
729 let query = r#"
730 SELECT a."MinTemp", b."MaxTemp"
731 FROM weather a INNER JOIN weather b
732 ON a."RainToday" = b."RainToday"
733 "#;
734 let annotated = sql_to_explain_with_broadcast(query, 3, true).await;
735 assert_snapshot!(annotated, @"
736 ┌───── DistributedExec ── Tasks: t0:[p0]
737 │ CoalescePartitionsExec
738 │ [Stage 2] => NetworkCoalesceExec: output_partitions=3, input_tasks=3
739 └──────────────────────────────────────────────────
740 ┌───── Stage 2 ── Tasks: t0:[p0] t1:[p1] t2:[p2]
741 │ HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(RainToday@1, RainToday@1)], projection=[MinTemp@0, MaxTemp@2]
742 │ CoalescePartitionsExec
743 │ [Stage 1] => NetworkBroadcastExec: partitions_per_consumer=1, stage_partitions=3, input_tasks=3
744 │ PartitionIsolatorExec: tasks=3 partitions=3
745 │ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MaxTemp, RainToday], file_type=parquet, predicate=DynamicFilter [ empty ]
746 └──────────────────────────────────────────────────
747 ┌───── Stage 1 ── Tasks: t0:[p0..p2] t1:[p3..p5] t2:[p6..p8]
748 │ BroadcastExec: input_partitions=1, consumer_tasks=3, output_partitions=3
749 │ PartitionIsolatorExec: tasks=3 partitions=3
750 │ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, RainToday], file_type=parquet
751 └──────────────────────────────────────────────────
752 ")
753 }
754
755 #[tokio::test]
756 async fn test_broadcast_nested_joins() {
757 let query = r#"
758 SELECT a."MinTemp", b."MaxTemp", c."Rainfall"
759 FROM weather a
760 INNER JOIN weather b ON a."RainToday" = b."RainToday"
761 INNER JOIN weather c ON b."RainToday" = c."RainToday"
762 "#;
763 let plan = sql_to_explain_with_broadcast(query, 3, true).await;
764 assert_snapshot!(plan, @"
765 ┌───── DistributedExec ── Tasks: t0:[p0]
766 │ CoalescePartitionsExec
767 │ [Stage 3] => NetworkCoalesceExec: output_partitions=3, input_tasks=3
768 └──────────────────────────────────────────────────
769 ┌───── Stage 3 ── Tasks: t0:[p0] t1:[p1] t2:[p2]
770 │ HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(RainToday@2, RainToday@1)], projection=[MinTemp@0, MaxTemp@1, Rainfall@3]
771 │ CoalescePartitionsExec
772 │ [Stage 2] => NetworkBroadcastExec: partitions_per_consumer=1, stage_partitions=3, input_tasks=3
773 │ PartitionIsolatorExec: tasks=3 partitions=3
774 │ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[Rainfall, RainToday], file_type=parquet, predicate=DynamicFilter [ empty ]
775 └──────────────────────────────────────────────────
776 ┌───── Stage 2 ── Tasks: t0:[p0..p2] t1:[p3..p5] t2:[p6..p8]
777 │ BroadcastExec: input_partitions=1, consumer_tasks=3, output_partitions=3
778 │ HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(RainToday@1, RainToday@1)], projection=[MinTemp@0, MaxTemp@2, RainToday@3]
779 │ CoalescePartitionsExec
780 │ [Stage 1] => NetworkBroadcastExec: partitions_per_consumer=1, stage_partitions=3, input_tasks=3
781 │ PartitionIsolatorExec: tasks=3 partitions=3
782 │ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MaxTemp, RainToday], file_type=parquet, predicate=DynamicFilter [ empty ]
783 └──────────────────────────────────────────────────
784 ┌───── Stage 1 ── Tasks: t0:[p0..p2] t1:[p3..p5] t2:[p6..p8]
785 │ BroadcastExec: input_partitions=1, consumer_tasks=3, output_partitions=3
786 │ PartitionIsolatorExec: tasks=3 partitions=3
787 │ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, RainToday], file_type=parquet
788 └──────────────────────────────────────────────────
789 ")
790 }
791
792 #[tokio::test]
793 async fn test_broadcast_datasource_as_build_child() {
794 let query = r#"
795 SELECT a."MinTemp", b."MaxTemp"
796 FROM weather a INNER JOIN weather b
797 ON a."RainToday" = b."RainToday"
798 "#;
799
800 let physical_plan = sql_to_physical_plan(query, 4, 3).await;
801 assert_snapshot!(physical_plan, @r"
802 HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(RainToday@1, RainToday@1)], projection=[MinTemp@0, MaxTemp@2]
803 CoalescePartitionsExec
804 DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, RainToday], file_type=parquet
805 DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MaxTemp, RainToday], file_type=parquet, predicate=DynamicFilter [ empty ]
806 ");
807
808 let plan = sql_to_explain_with_broadcast(query, 3, true).await;
809 assert_snapshot!(plan, @"
810 ┌───── DistributedExec ── Tasks: t0:[p0]
811 │ CoalescePartitionsExec
812 │ [Stage 2] => NetworkCoalesceExec: output_partitions=3, input_tasks=3
813 └──────────────────────────────────────────────────
814 ┌───── Stage 2 ── Tasks: t0:[p0] t1:[p1] t2:[p2]
815 │ HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(RainToday@1, RainToday@1)], projection=[MinTemp@0, MaxTemp@2]
816 │ CoalescePartitionsExec
817 │ [Stage 1] => NetworkBroadcastExec: partitions_per_consumer=1, stage_partitions=3, input_tasks=3
818 │ PartitionIsolatorExec: tasks=3 partitions=3
819 │ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MaxTemp, RainToday], file_type=parquet, predicate=DynamicFilter [ empty ]
820 └──────────────────────────────────────────────────
821 ┌───── Stage 1 ── Tasks: t0:[p0..p2] t1:[p3..p5] t2:[p6..p8]
822 │ BroadcastExec: input_partitions=1, consumer_tasks=3, output_partitions=3
823 │ PartitionIsolatorExec: tasks=3 partitions=3
824 │ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, RainToday], file_type=parquet
825 └──────────────────────────────────────────────────
826 ")
827 }
828
829 #[tokio::test]
830 async fn test_broadcast_union_children_isolator_plan() {
831 let query = r#"
832 SET distributed.children_isolator_unions = true;
833 SELECT a."MinTemp", b."MaxTemp"
834 FROM weather a INNER JOIN weather b
835 ON a."RainToday" = b."RainToday"
836 UNION ALL
837 SELECT a."MinTemp", b."MaxTemp"
838 FROM weather a INNER JOIN weather b
839 ON a."RainToday" = b."RainToday"
840 UNION ALL
841 SELECT a."MinTemp", b."MaxTemp"
842 FROM weather a INNER JOIN weather b
843 ON a."RainToday" = b."RainToday"
844 "#;
845 let plan = sql_to_explain_with_broadcast(query, 3, true).await;
846 assert_snapshot!(plan, @r"
847 ┌───── DistributedExec ── Tasks: t0:[p0]
848 │ CoalescePartitionsExec
849 │ [Stage 1] => NetworkCoalesceExec: output_partitions=9, input_tasks=3
850 └──────────────────────────────────────────────────
851 ┌───── Stage 1 ── Tasks: t0:[p0..p2] t1:[p3..p5] t2:[p6..p8]
852 │ DistributedUnionExec: t0:[c0] t1:[c1] t2:[c2]
853 │ HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(RainToday@1, RainToday@1)], projection=[MinTemp@0, MaxTemp@2]
854 │ CoalescePartitionsExec
855 │ BroadcastExec: input_partitions=3, consumer_tasks=1, output_partitions=3
856 │ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, RainToday], file_type=parquet
857 │ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MaxTemp, RainToday], file_type=parquet, predicate=DynamicFilter [ empty ]
858 │ HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(RainToday@1, RainToday@1)], projection=[MinTemp@0, MaxTemp@2]
859 │ CoalescePartitionsExec
860 │ BroadcastExec: input_partitions=3, consumer_tasks=1, output_partitions=3
861 │ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, RainToday], file_type=parquet
862 │ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MaxTemp, RainToday], file_type=parquet, predicate=DynamicFilter [ empty ]
863 │ HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(RainToday@1, RainToday@1)], projection=[MinTemp@0, MaxTemp@2]
864 │ CoalescePartitionsExec
865 │ BroadcastExec: input_partitions=3, consumer_tasks=1, output_partitions=3
866 │ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, RainToday], file_type=parquet
867 │ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MaxTemp, RainToday], file_type=parquet, predicate=DynamicFilter [ empty ]
868 └──────────────────────────────────────────────────
869 ");
870 }
871
872 #[tokio::test]
873 async fn test_broadcast_one_to_many_plan() {
874 let query = r#"
875 SELECT a."MinTemp", b."MaxTemp"
876 FROM weather a INNER JOIN weather b
877 ON a."RainToday" = b."RainToday"
878 "#;
879 let plan = sql_to_explain_with_broadcast_one_to_many(query, 3).await;
880 assert_snapshot!(plan, @"
881 ┌───── DistributedExec ── Tasks: t0:[p0]
882 │ CoalescePartitionsExec
883 │ [Stage 2] => NetworkCoalesceExec: output_partitions=3, input_tasks=3
884 └──────────────────────────────────────────────────
885 ┌───── Stage 2 ── Tasks: t0:[p0] t1:[p1] t2:[p2]
886 │ HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(RainToday@1, RainToday@1)], projection=[MinTemp@0, MaxTemp@2]
887 │ CoalescePartitionsExec
888 │ [Stage 1] => NetworkBroadcastExec: partitions_per_consumer=3, stage_partitions=9, input_tasks=1
889 │ PartitionIsolatorExec: tasks=3 partitions=3
890 │ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MaxTemp, RainToday], file_type=parquet, predicate=DynamicFilter [ empty ]
891 └──────────────────────────────────────────────────
892 ┌───── Stage 1 ── Tasks: t0:[p0..p8]
893 │ BroadcastExec: input_partitions=3, consumer_tasks=3, output_partitions=9
894 │ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[MinTemp, RainToday], file_type=parquet
895 └──────────────────────────────────────────────────
896 ");
897 }
898
899 async fn sql_to_explain(
900 query: &str,
901 f: impl FnOnce(SessionStateBuilder) -> SessionStateBuilder,
902 ) -> String {
903 explain_test_plan(query, TestPlanOptions::default(), true, f).await
904 }
905
906 async fn sql_to_explain_with_broadcast(
907 query: &str,
908 num_workers: usize,
909 broadcast_enabled: bool,
910 ) -> String {
911 sql_to_plan_with_options(query, num_workers, broadcast_enabled, true).await
912 }
913
914 async fn sql_to_explain_with_broadcast_one_to_many(query: &str, num_workers: usize) -> String {
915 let options = TestPlanOptions {
916 target_partitions: 4,
917 num_workers,
918 broadcast_enabled: true,
919 };
920 explain_test_plan(query, options, true, |b| {
921 b.with_distributed_task_estimator(BuildSideOneTaskEstimator)
922 })
923 .await
924 }
925
926 async fn sql_to_plan_with_options(
927 query: &str,
928 num_workers: usize,
929 broadcast_enabled: bool,
930 use_optimizer: bool,
931 ) -> String {
932 let options = TestPlanOptions {
933 target_partitions: 4,
934 num_workers,
935 broadcast_enabled,
936 };
937 explain_test_plan(query, options, use_optimizer, |b| b).await
938 }
939
940 async fn explain_test_plan(
941 query: &str,
942 options: TestPlanOptions,
943 use_optimizer: bool,
944 configure: impl FnOnce(SessionStateBuilder) -> SessionStateBuilder,
945 ) -> String {
946 let mut builder = base_session_builder(
947 options.target_partitions,
948 options.num_workers,
949 options.broadcast_enabled,
950 );
951 if use_optimizer {
952 builder =
953 builder.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule));
954 }
955 let builder = configure(builder);
956 let (ctx, query) = context_with_query(builder, query).await;
957 let df = ctx.sql(&query).await.unwrap();
958 let physical_plan = df.create_physical_plan().await.unwrap();
959
960 if use_optimizer {
961 display_plan_ascii(physical_plan.as_ref(), false)
962 } else {
963 format!("{}", displayable(physical_plan.as_ref()).indent(true))
964 }
965 }
966}