datafusion-distributed 2.0.0

Framework for enhancing Apache DataFusion with distributed capabilities
Documentation
#[cfg(all(feature = "integration", test))]
mod tests {
    use datafusion::arrow::util::pretty::pretty_format_batches;
    use datafusion::execution::TaskContext;
    use datafusion::physical_plan::{ExecutionPlan, execute_stream};
    use datafusion::prelude::SessionContext;
    use datafusion_distributed::test_utils::localhost::start_localhost_context;
    use datafusion_distributed::test_utils::parquet::register_parquet_tables;
    use datafusion_distributed::{DefaultSessionBuilder, assert_snapshot, display_plan_ascii};
    use futures::TryStreamExt;
    use std::error::Error;
    use std::sync::Arc;

    #[tokio::test]
    async fn more_tasks_than_children() -> Result<(), Box<dyn Error>> {
        let (ctx_distributed, _guard, _) = start_localhost_context(3, DefaultSessionBuilder).await;

        let query = r#"
        SELECT "MinTemp", "RainToday" FROM weather WHERE "MinTemp" > 10.0
        UNION ALL
        SELECT "MaxTemp", "RainToday" FROM weather WHERE "MaxTemp" < 30.0
        ORDER BY "MinTemp", "RainToday"
        "#;

        let ctx = SessionContext::default();
        *ctx.state_ref().write().config_mut() = ctx_distributed.copied_config();
        register_parquet_tables(&ctx).await?;
        let df = ctx.sql(query).await?;
        let physical = df.create_physical_plan().await?;

        register_parquet_tables(&ctx_distributed).await?;
        ctx_distributed
            .sql("SET distributed.children_isolator_unions=true;")
            .await?;
        let df_distributed = ctx_distributed.sql(query).await?;
        let physical_distributed = df_distributed.create_physical_plan().await?;
        let physical_distributed_str = display_plan_ascii(physical_distributed.as_ref(), false);

        assert_snapshot!(physical_distributed_str,
            @r"
        ┌───── DistributedExec ── Tasks: t0:[p0]
        │ SortPreservingMergeExec: [MinTemp@0 ASC NULLS LAST, RainToday@1 ASC NULLS LAST]
        │   [Stage 1] => NetworkCoalesceExec: output_partitions=9, input_tasks=3
        └──────────────────────────────────────────────────
          ┌───── Stage 1 ── Tasks: t0:[p0..p2] t1:[p3..p5] t2:[p6..p8]
          │ DistributedUnionExec: t0:[c0(0/2)] t1:[c0(1/2)] t2:[c1]
          │   SortExec: expr=[MinTemp@0 ASC NULLS LAST, RainToday@1 ASC NULLS LAST], preserve_partitioning=[true]
          │     FilterExec: MinTemp@0 > 10
          │       DistributedLeafExec:
          │         t0: DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet:<int>..<int>], [/testdata/weather/result-000000.parquet:<int>..<int>, /testdata/weather/result-000001.parquet:<int>..<int>], [/testdata/weather/result-000002.parquet:<int>..<int>]]}, projection=[MinTemp, RainToday], file_type=parquet, predicate=MinTemp@0 > 10, pruning_predicate=MinTemp_null_count@1 != row_count@2 AND MinTemp_max@0 > 10, required_guarantees=[]
          │         t1: DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet:<int>..<int>], [/testdata/weather/result-000001.parquet:<int>..<int>, /testdata/weather/result-000002.parquet:<int>..<int>], [/testdata/weather/result-000002.parquet:<int>..<int>]]}, projection=[MinTemp, RainToday], file_type=parquet, predicate=MinTemp@0 > 10, pruning_predicate=MinTemp_null_count@1 != row_count@2 AND MinTemp_max@0 > 10, required_guarantees=[]
          │   SortExec: expr=[MinTemp@0 ASC NULLS LAST, RainToday@1 ASC NULLS LAST], preserve_partitioning=[true]
          │     ProjectionExec: expr=[MaxTemp@0 as MinTemp, RainToday@1 as RainToday]
          │       FilterExec: MaxTemp@0 < 30
          │         DistributedLeafExec:
          │           t0: DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet:<int>..<int>], [/testdata/weather/result-000000.parquet:<int>..<int>, /testdata/weather/result-000001.parquet:<int>..<int>, /testdata/weather/result-000002.parquet:<int>..<int>], [/testdata/weather/result-000002.parquet:<int>..<int>]]}, projection=[MaxTemp, RainToday], file_type=parquet, predicate=MaxTemp@1 < 30, pruning_predicate=MaxTemp_null_count@1 != row_count@2 AND MaxTemp_min@0 < 30, required_guarantees=[]
          └──────────────────────────────────────────────────
        ",
        );

        exact_same_data(ctx.task_ctx(), physical, physical_distributed).await
    }

    #[tokio::test]
    async fn same_children_than_tasks() -> Result<(), Box<dyn Error>> {
        let (ctx_distributed, _guard, _) = start_localhost_context(3, DefaultSessionBuilder).await;

        let query = r#"
        SELECT "MinTemp", "RainToday" FROM weather WHERE "MinTemp" > 20.0
        UNION ALL
        SELECT "MaxTemp", "RainToday" FROM weather WHERE "MaxTemp" < 25.0
        UNION ALL
        SELECT "Temp9am", "RainToday" FROM weather WHERE "Temp9am" > 15.0
        ORDER BY "MinTemp", "RainToday"
        "#;

        let ctx = SessionContext::default();
        *ctx.state_ref().write().config_mut() = ctx_distributed.copied_config();
        register_parquet_tables(&ctx).await?;
        let df = ctx.sql(query).await?;
        let physical = df.create_physical_plan().await?;

        register_parquet_tables(&ctx_distributed).await?;
        ctx_distributed
            .sql("SET distributed.children_isolator_unions=true;")
            .await?;
        let df_distributed = ctx_distributed.sql(query).await?;
        let physical_distributed = df_distributed.create_physical_plan().await?;
        let physical_distributed_str = display_plan_ascii(physical_distributed.as_ref(), false);

        assert_snapshot!(physical_distributed_str,
            @r"
        ┌───── DistributedExec ── Tasks: t0:[p0]
        │ SortPreservingMergeExec: [MinTemp@0 ASC NULLS LAST, RainToday@1 ASC NULLS LAST]
        │   [Stage 1] => NetworkCoalesceExec: output_partitions=9, input_tasks=3
        └──────────────────────────────────────────────────
          ┌───── Stage 1 ── Tasks: t0:[p0..p2] t1:[p3..p5] t2:[p6..p8]
          │ DistributedUnionExec: t0:[c0] t1:[c1] t2:[c2]
          │   SortExec: expr=[MinTemp@0 ASC NULLS LAST, RainToday@1 ASC NULLS LAST], preserve_partitioning=[true]
          │     FilterExec: MinTemp@0 > 20
          │       DistributedLeafExec:
          │         t0: DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet:<int>..<int>], [/testdata/weather/result-000000.parquet:<int>..<int>, /testdata/weather/result-000001.parquet:<int>..<int>, /testdata/weather/result-000002.parquet:<int>..<int>], [/testdata/weather/result-000002.parquet:<int>..<int>]]}, projection=[MinTemp, RainToday], file_type=parquet, predicate=MinTemp@0 > 20, pruning_predicate=MinTemp_null_count@1 != row_count@2 AND MinTemp_max@0 > 20, required_guarantees=[]
          │   SortExec: expr=[MinTemp@0 ASC NULLS LAST, RainToday@1 ASC NULLS LAST], preserve_partitioning=[true]
          │     ProjectionExec: expr=[MaxTemp@0 as MinTemp, RainToday@1 as RainToday]
          │       FilterExec: MaxTemp@0 < 25
          │         DistributedLeafExec:
          │           t0: DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet:<int>..<int>], [/testdata/weather/result-000000.parquet:<int>..<int>, /testdata/weather/result-000001.parquet:<int>..<int>, /testdata/weather/result-000002.parquet:<int>..<int>], [/testdata/weather/result-000002.parquet:<int>..<int>]]}, projection=[MaxTemp, RainToday], file_type=parquet, predicate=MaxTemp@1 < 25, pruning_predicate=MaxTemp_null_count@1 != row_count@2 AND MaxTemp_min@0 < 25, required_guarantees=[]
          │   SortExec: expr=[MinTemp@0 ASC NULLS LAST, RainToday@1 ASC NULLS LAST], preserve_partitioning=[true]
          │     ProjectionExec: expr=[Temp9am@0 as MinTemp, RainToday@1 as RainToday]
          │       FilterExec: Temp9am@0 > 15
          │         DistributedLeafExec:
          │           t0: DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet:<int>..<int>], [/testdata/weather/result-000000.parquet:<int>..<int>, /testdata/weather/result-000001.parquet:<int>..<int>, /testdata/weather/result-000002.parquet:<int>..<int>], [/testdata/weather/result-000002.parquet:<int>..<int>]]}, projection=[Temp9am, RainToday], file_type=parquet, predicate=Temp9am@17 > 15, pruning_predicate=Temp9am_null_count@1 != row_count@2 AND Temp9am_max@0 > 15, required_guarantees=[]
          └──────────────────────────────────────────────────
        ",
        );

        exact_same_data(ctx.task_ctx(), physical, physical_distributed).await
    }

    #[tokio::test]
    async fn more_children_than_tasks() -> Result<(), Box<dyn Error>> {
        let (ctx_distributed, _guard, _) = start_localhost_context(3, DefaultSessionBuilder).await;

        let query = r#"
        SELECT "MinTemp", "RainToday" FROM weather WHERE "MinTemp" > 10.0
        UNION ALL
        SELECT "MaxTemp", "RainToday" FROM weather WHERE "MaxTemp" < 30.0
        UNION ALL
        SELECT "Temp9am", "RainToday" FROM weather WHERE "Temp9am" > 15.0
        UNION ALL
        SELECT "Temp3pm", "RainToday" FROM weather WHERE "Temp3pm" < 25.0
        UNION ALL
        SELECT "Rainfall", "RainToday" FROM weather WHERE "Rainfall" > 5.0
        ORDER BY "MinTemp", "RainToday"
        "#;

        let ctx = SessionContext::default();
        *ctx.state_ref().write().config_mut() = ctx_distributed.copied_config();
        register_parquet_tables(&ctx).await?;
        let df = ctx.sql(query).await?;
        let physical = df.create_physical_plan().await?;

        register_parquet_tables(&ctx_distributed).await?;
        ctx_distributed
            .sql("SET distributed.children_isolator_unions=true;")
            .await?;
        let df_distributed = ctx_distributed.sql(query).await?;
        let physical_distributed = df_distributed.create_physical_plan().await?;
        let physical_distributed_str = display_plan_ascii(physical_distributed.as_ref(), false);

        assert_snapshot!(physical_distributed_str,
            @r"
        ┌───── DistributedExec ── Tasks: t0:[p0]
        │ SortPreservingMergeExec: [MinTemp@0 ASC NULLS LAST, RainToday@1 ASC NULLS LAST]
        │   [Stage 1] => NetworkCoalesceExec: output_partitions=18, input_tasks=3
        └──────────────────────────────────────────────────
          ┌───── Stage 1 ── Tasks: t0:[p0..p5] t1:[p6..p11] t2:[p12..p17]
          │ DistributedUnionExec: t0:[c0, c3] t1:[c1, c4] t2:[c2]
          │   SortExec: expr=[MinTemp@0 ASC NULLS LAST, RainToday@1 ASC NULLS LAST], preserve_partitioning=[true]
          │     FilterExec: MinTemp@0 > 10
          │       DistributedLeafExec:
          │         t0: DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet:<int>..<int>], [/testdata/weather/result-000000.parquet:<int>..<int>, /testdata/weather/result-000001.parquet:<int>..<int>, /testdata/weather/result-000002.parquet:<int>..<int>], [/testdata/weather/result-000002.parquet:<int>..<int>]]}, projection=[MinTemp, RainToday], file_type=parquet, predicate=MinTemp@0 > 10, pruning_predicate=MinTemp_null_count@1 != row_count@2 AND MinTemp_max@0 > 10, required_guarantees=[]
          │   SortExec: expr=[MinTemp@0 ASC NULLS LAST, RainToday@1 ASC NULLS LAST], preserve_partitioning=[true]
          │     ProjectionExec: expr=[MaxTemp@0 as MinTemp, RainToday@1 as RainToday]
          │       FilterExec: MaxTemp@0 < 30
          │         DistributedLeafExec:
          │           t0: DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet:<int>..<int>], [/testdata/weather/result-000000.parquet:<int>..<int>, /testdata/weather/result-000001.parquet:<int>..<int>, /testdata/weather/result-000002.parquet:<int>..<int>], [/testdata/weather/result-000002.parquet:<int>..<int>]]}, projection=[MaxTemp, RainToday], file_type=parquet, predicate=MaxTemp@1 < 30, pruning_predicate=MaxTemp_null_count@1 != row_count@2 AND MaxTemp_min@0 < 30, required_guarantees=[]
          │   SortExec: expr=[MinTemp@0 ASC NULLS LAST, RainToday@1 ASC NULLS LAST], preserve_partitioning=[true]
          │     ProjectionExec: expr=[Temp9am@0 as MinTemp, RainToday@1 as RainToday]
          │       FilterExec: Temp9am@0 > 15
          │         DistributedLeafExec:
          │           t0: DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet:<int>..<int>], [/testdata/weather/result-000000.parquet:<int>..<int>, /testdata/weather/result-000001.parquet:<int>..<int>, /testdata/weather/result-000002.parquet:<int>..<int>], [/testdata/weather/result-000002.parquet:<int>..<int>]]}, projection=[Temp9am, RainToday], file_type=parquet, predicate=Temp9am@17 > 15, pruning_predicate=Temp9am_null_count@1 != row_count@2 AND Temp9am_max@0 > 15, required_guarantees=[]
          │   SortExec: expr=[MinTemp@0 ASC NULLS LAST, RainToday@1 ASC NULLS LAST], preserve_partitioning=[true]
          │     ProjectionExec: expr=[Temp3pm@0 as MinTemp, RainToday@1 as RainToday]
          │       FilterExec: Temp3pm@0 < 25
          │         DistributedLeafExec:
          │           t0: DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet:<int>..<int>], [/testdata/weather/result-000000.parquet:<int>..<int>, /testdata/weather/result-000001.parquet:<int>..<int>, /testdata/weather/result-000002.parquet:<int>..<int>], [/testdata/weather/result-000002.parquet:<int>..<int>]]}, projection=[Temp3pm, RainToday], file_type=parquet, predicate=Temp3pm@18 < 25, pruning_predicate=Temp3pm_null_count@1 != row_count@2 AND Temp3pm_min@0 < 25, required_guarantees=[]
          │   SortExec: expr=[MinTemp@0 ASC NULLS LAST, RainToday@1 ASC NULLS LAST], preserve_partitioning=[true]
          │     ProjectionExec: expr=[Rainfall@0 as MinTemp, RainToday@1 as RainToday]
          │       FilterExec: Rainfall@0 > 5
          │         DistributedLeafExec:
          │           t0: DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet:<int>..<int>], [/testdata/weather/result-000000.parquet:<int>..<int>, /testdata/weather/result-000001.parquet:<int>..<int>, /testdata/weather/result-000002.parquet:<int>..<int>], [/testdata/weather/result-000002.parquet:<int>..<int>]]}, projection=[Rainfall, RainToday], file_type=parquet, predicate=Rainfall@2 > 5, pruning_predicate=Rainfall_null_count@1 != row_count@2 AND Rainfall_max@0 > 5, required_guarantees=[]
          └──────────────────────────────────────────────────
        ",
        );

        exact_same_data(ctx.task_ctx(), physical, physical_distributed).await
    }

    #[tokio::test]
    async fn nested_unions() -> Result<(), Box<dyn Error>> {
        let (ctx_distributed, _guard, _) = start_localhost_context(3, DefaultSessionBuilder).await;

        // The LIMIT on the inner subqueries prevents the logical optimizer from
        // flattening the nested `UNION ALL`s into a single `Union`, so the resulting
        // physical plan contains a `UnionExec` whose child is another `UnionExec`.
        let query = r#"
        SELECT * FROM (
            SELECT "MinTemp", "RainToday" FROM weather WHERE "MinTemp" > 10.0
            UNION ALL
            SELECT "MaxTemp", "RainToday" FROM weather WHERE "MaxTemp" < 30.0
            LIMIT 1000000
        )
        UNION ALL
        SELECT * FROM (
            SELECT "Temp9am", "RainToday" FROM weather WHERE "Temp9am" > 15.0
            UNION ALL
            SELECT "Temp3pm", "RainToday" FROM weather WHERE "Temp3pm" < 25.0
            LIMIT 1000000
        )
        ORDER BY "MinTemp", "RainToday"
        "#;

        let ctx = SessionContext::default();
        *ctx.state_ref().write().config_mut() = ctx_distributed.copied_config();
        register_parquet_tables(&ctx).await?;
        let df = ctx.sql(query).await?;
        let physical = df.create_physical_plan().await?;

        register_parquet_tables(&ctx_distributed).await?;
        ctx_distributed
            .sql("SET distributed.children_isolator_unions=true;")
            .await?;
        let df_distributed = ctx_distributed.sql(query).await?;
        let physical_distributed = df_distributed.create_physical_plan().await?;
        let physical_distributed_str = display_plan_ascii(physical_distributed.as_ref(), false);

        assert_snapshot!(physical_distributed_str,
            @r"
        ┌───── DistributedExec ── Tasks: t0:[p0]
        │ SortPreservingMergeExec: [MinTemp@0 ASC NULLS LAST, RainToday@1 ASC NULLS LAST]
        │   [Stage 7] => NetworkCoalesceExec: output_partitions=2, input_tasks=2
        └──────────────────────────────────────────────────
          ┌───── Stage 7 ── Tasks: t0:[p0] t1:[p1]
          │ DistributedUnionExec: t0:[c0] t1:[c1]
          │   SortExec: TopK(fetch=1000000), expr=[MinTemp@0 ASC NULLS LAST, RainToday@1 ASC NULLS LAST], preserve_partitioning=[false]
          │     CoalescePartitionsExec
          │       [Stage 3] => NetworkCoalesceExec: output_partitions=2, input_tasks=2
          │   SortExec: expr=[MinTemp@0 ASC NULLS LAST, RainToday@1 ASC NULLS LAST], preserve_partitioning=[false]
          │     ProjectionExec: expr=[Temp9am@0 as MinTemp, RainToday@1 as RainToday]
          │       CoalescePartitionsExec: fetch=1000000
          │         [Stage 6] => NetworkCoalesceExec: output_partitions=2, input_tasks=2
          └──────────────────────────────────────────────────
            ┌───── Stage 3 ── Tasks: t0:[p0] t1:[p1]
            │ DistributedUnionExec: t0:[c0] t1:[c1]
            │   CoalescePartitionsExec: fetch=1000000
            │     [Stage 1] => NetworkCoalesceExec: output_partitions=9, input_tasks=3
            │   ProjectionExec: expr=[MaxTemp@0 as MinTemp, RainToday@1 as RainToday]
            │     CoalescePartitionsExec: fetch=1000000
            │       [Stage 2] => NetworkCoalesceExec: output_partitions=9, input_tasks=3
            └──────────────────────────────────────────────────
              ┌───── Stage 1 ── Tasks: t0:[p0..p2] t1:[p3..p5] t2:[p6..p8]
              │ FilterExec: MinTemp@0 > 10, fetch=1000000
              │   DistributedLeafExec:
              │     t0: DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet:<int>..<int>], [/testdata/weather/result-000000.parquet:<int>..<int>, /testdata/weather/result-000001.parquet:<int>..<int>], [/testdata/weather/result-000002.parquet:<int>..<int>]]}, projection=[MinTemp, RainToday], file_type=parquet, predicate=MinTemp@0 > 10 AND DynamicFilter [ empty ], pruning_predicate=MinTemp_null_count@1 != row_count@2 AND MinTemp_max@0 > 10, required_guarantees=[]
              │     t1: DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet:<int>..<int>], [/testdata/weather/result-000001.parquet:<int>..<int>], [/testdata/weather/result-000002.parquet:<int>..<int>]]}, projection=[MinTemp, RainToday], file_type=parquet, predicate=MinTemp@0 > 10 AND DynamicFilter [ empty ], pruning_predicate=MinTemp_null_count@1 != row_count@2 AND MinTemp_max@0 > 10, required_guarantees=[]
              │     t2: DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet:<int>..<int>], [/testdata/weather/result-000001.parquet:<int>..<int>, /testdata/weather/result-000002.parquet:<int>..<int>], [/testdata/weather/result-000002.parquet:<int>..<int>]]}, projection=[MinTemp, RainToday], file_type=parquet, predicate=MinTemp@0 > 10 AND DynamicFilter [ empty ], pruning_predicate=MinTemp_null_count@1 != row_count@2 AND MinTemp_max@0 > 10, required_guarantees=[]
              └──────────────────────────────────────────────────
              ┌───── Stage 2 ── Tasks: t0:[p0..p2] t1:[p3..p5] t2:[p6..p8]
              │ FilterExec: MaxTemp@0 < 30, fetch=1000000
              │   DistributedLeafExec:
              │     t0: DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet:<int>..<int>], [/testdata/weather/result-000000.parquet:<int>..<int>, /testdata/weather/result-000001.parquet:<int>..<int>], [/testdata/weather/result-000002.parquet:<int>..<int>]]}, projection=[MaxTemp, RainToday], file_type=parquet, predicate=MaxTemp@1 < 30 AND DynamicFilter [ empty ], pruning_predicate=MaxTemp_null_count@1 != row_count@2 AND MaxTemp_min@0 < 30, required_guarantees=[]
              │     t1: DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet:<int>..<int>], [/testdata/weather/result-000001.parquet:<int>..<int>], [/testdata/weather/result-000002.parquet:<int>..<int>]]}, projection=[MaxTemp, RainToday], file_type=parquet, predicate=MaxTemp@1 < 30 AND DynamicFilter [ empty ], pruning_predicate=MaxTemp_null_count@1 != row_count@2 AND MaxTemp_min@0 < 30, required_guarantees=[]
              │     t2: DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet:<int>..<int>], [/testdata/weather/result-000001.parquet:<int>..<int>, /testdata/weather/result-000002.parquet:<int>..<int>], [/testdata/weather/result-000002.parquet:<int>..<int>]]}, projection=[MaxTemp, RainToday], file_type=parquet, predicate=MaxTemp@1 < 30 AND DynamicFilter [ empty ], pruning_predicate=MaxTemp_null_count@1 != row_count@2 AND MaxTemp_min@0 < 30, required_guarantees=[]
              └──────────────────────────────────────────────────
            ┌───── Stage 6 ── Tasks: t0:[p0] t1:[p1]
            │ LocalLimitExec: fetch=1000000
            │   DistributedUnionExec: t0:[c0] t1:[c1]
            │     CoalescePartitionsExec: fetch=1000000
            │       [Stage 4] => NetworkCoalesceExec: output_partitions=9, input_tasks=3
            │     ProjectionExec: expr=[Temp3pm@0 as Temp9am, RainToday@1 as RainToday]
            │       CoalescePartitionsExec: fetch=1000000
            │         [Stage 5] => NetworkCoalesceExec: output_partitions=9, input_tasks=3
            └──────────────────────────────────────────────────
              ┌───── Stage 4 ── Tasks: t0:[p0..p2] t1:[p3..p5] t2:[p6..p8]
              │ FilterExec: Temp9am@0 > 15, fetch=1000000
              │   DistributedLeafExec:
              │     t0: DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet:<int>..<int>], [/testdata/weather/result-000000.parquet:<int>..<int>, /testdata/weather/result-000001.parquet:<int>..<int>], [/testdata/weather/result-000002.parquet:<int>..<int>]]}, projection=[Temp9am, RainToday], file_type=parquet, predicate=Temp9am@17 > 15, pruning_predicate=Temp9am_null_count@1 != row_count@2 AND Temp9am_max@0 > 15, required_guarantees=[]
              │     t1: DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet:<int>..<int>], [/testdata/weather/result-000001.parquet:<int>..<int>], [/testdata/weather/result-000002.parquet:<int>..<int>]]}, projection=[Temp9am, RainToday], file_type=parquet, predicate=Temp9am@17 > 15, pruning_predicate=Temp9am_null_count@1 != row_count@2 AND Temp9am_max@0 > 15, required_guarantees=[]
              │     t2: DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet:<int>..<int>], [/testdata/weather/result-000001.parquet:<int>..<int>, /testdata/weather/result-000002.parquet:<int>..<int>], [/testdata/weather/result-000002.parquet:<int>..<int>]]}, projection=[Temp9am, RainToday], file_type=parquet, predicate=Temp9am@17 > 15, pruning_predicate=Temp9am_null_count@1 != row_count@2 AND Temp9am_max@0 > 15, required_guarantees=[]
              └──────────────────────────────────────────────────
              ┌───── Stage 5 ── Tasks: t0:[p0..p2] t1:[p3..p5] t2:[p6..p8]
              │ FilterExec: Temp3pm@0 < 25, fetch=1000000
              │   DistributedLeafExec:
              │     t0: DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet:<int>..<int>], [/testdata/weather/result-000000.parquet:<int>..<int>, /testdata/weather/result-000001.parquet:<int>..<int>], [/testdata/weather/result-000002.parquet:<int>..<int>]]}, projection=[Temp3pm, RainToday], file_type=parquet, predicate=Temp3pm@18 < 25, pruning_predicate=Temp3pm_null_count@1 != row_count@2 AND Temp3pm_min@0 < 25, required_guarantees=[]
              │     t1: DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet:<int>..<int>], [/testdata/weather/result-000001.parquet:<int>..<int>], [/testdata/weather/result-000002.parquet:<int>..<int>]]}, projection=[Temp3pm, RainToday], file_type=parquet, predicate=Temp3pm@18 < 25, pruning_predicate=Temp3pm_null_count@1 != row_count@2 AND Temp3pm_min@0 < 25, required_guarantees=[]
              │     t2: DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet:<int>..<int>], [/testdata/weather/result-000001.parquet:<int>..<int>, /testdata/weather/result-000002.parquet:<int>..<int>], [/testdata/weather/result-000002.parquet:<int>..<int>]]}, projection=[Temp3pm, RainToday], file_type=parquet, predicate=Temp3pm@18 < 25, pruning_predicate=Temp3pm_null_count@1 != row_count@2 AND Temp3pm_min@0 < 25, required_guarantees=[]
              └──────────────────────────────────────────────────
        ",
        );

        exact_same_data(ctx.task_ctx(), physical, physical_distributed).await
    }

    async fn exact_same_data(
        task_ctx: Arc<TaskContext>,
        one: Arc<dyn ExecutionPlan>,
        other: Arc<dyn ExecutionPlan>,
    ) -> Result<(), Box<dyn Error>> {
        let batches = pretty_format_batches(
            &execute_stream(one, task_ctx.clone())?
                .try_collect::<Vec<_>>()
                .await?,
        )?;

        let batches_distributed = pretty_format_batches(
            &execute_stream(other, task_ctx)?
                .try_collect::<Vec<_>>()
                .await?,
        )?;

        // Verify that both plans produce the same results
        assert_eq!(batches.to_string(), batches_distributed.to_string());
        Ok(())
    }
}