datafusion-distributed 2.0.0

Framework for enhancing Apache DataFusion with distributed capabilities
Documentation
# Custom Execution Plan Example

Demonstrates how to create a distributed custom execution plan with a `numbers(start, end)` table function.

## Components

**NumbersTableFunction** – Table function callable in SQL: `SELECT * FROM numbers(1, 100)`

**NumbersExec** – Execution plan with `ranges_per_task: Vec<Range<i64>>` storing one range per task.
Uses `DistributedTaskContext` to determine which range to generate.

**NumbersExecCodec** – Protobuf-based serialization implementing `PhysicalExtensionCodec`.
Must be registered in the `SessionStateBuilder` that initiates the query as well as the one used by `Worker`s.

**NumbersTaskEstimator** – Controls distributed parallelism:

- `task_estimation()` - Returns how many tasks needed based on range size and config
- `scale_up_leaf_node()` - Splits single range of numbers into N per-task ranges

**NumbersConfig** – Custom config extension for controlling distributed parallelism (`numbers_per_task: usize`)

## Usage

This example imports the `InMemoryWorkerResolver` and the `InMemoryChannelResolver` used during integration testing
of this project, so it needs the `--features integration` flag on.

This example demonstrates how the bigger the range of numbers is queried, the more tasks are used in executing
the query, for example:

```bash
cargo run \
  --features integration \
  --example custom_execution_plan \
  "SELECT DISTINCT number FROM numbers(0, 10) ORDER BY number" \
  --show-distributed-plan
```

```
SortPreservingMergeExec: [number@0 ASC NULLS LAST]
  SortExec: expr=[number@0 ASC NULLS LAST], preserve_partitioning=[true]
    AggregateExec: mode=FinalPartitioned, gby=[number@0 as number], aggr=[]
      RepartitionExec: partitioning=Hash([number@0], 16), input_partitions=16
        AggregateExec: mode=Partial, gby=[number@0 as number], aggr=[]
          RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=1
            NumbersExec: t0:[0-10)
```

This will print a non-distributed plan, as the range of numbers we are querying (`numbers(0, 10)`) is small.

The config parameter `numbers.numbers_per_task` is the one that controls how many distributed tasks are used in the
query, and it's default value is `10`, so querying 10 numbers will not distribute the plan.

However, if we try to query 11 numbers:

```bash
cargo run \
  --features integration \
  --example custom_execution_plan \
  "SELECT DISTINCT number FROM numbers(0, 11) ORDER BY number" \
  --show-distributed-plan
```

```
┌───── DistributedExec ── Tasks: t0:[p0] 
│ SortPreservingMergeExec: [number@0 ASC NULLS LAST]
│   [Stage 2] => NetworkCoalesceExec: output_partitions=32, input_tasks=2
└──────────────────────────────────────────────────
  ┌───── Stage 2 ── Tasks: t0:[p0..p15] t1:[p0..p15] 
  │ SortExec: expr=[number@0 ASC NULLS LAST], preserve_partitioning=[true]
  │   AggregateExec: mode=FinalPartitioned, gby=[number@0 as number], aggr=[]
  │     [Stage 1] => NetworkShuffleExec: output_partitions=16, input_tasks=2
  └──────────────────────────────────────────────────
    ┌───── Stage 1 ── Tasks: t0:[p0..p31] t1:[p0..p31] 
    │ RepartitionExec: partitioning=Hash([number@0], 32), input_partitions=16
    │   AggregateExec: mode=Partial, gby=[number@0 as number], aggr=[]
    │     RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=1
    │       CooperativeExec
    │         NumbersExec: t0:[0-6), t1:[6-11)
    └──────────────────────────────────────────────────
```

The distribution rule kicks in, and the plan gets distributed.

Note that the parallelism in the plan has an upper threshold, so for example, if we query 100 numbers:

```bash
cargo run \
  --features integration \
  --example custom_execution_plan \
  "SELECT DISTINCT number FROM numbers(0, 100) ORDER BY number" \
  --show-distributed-plan
```

```
┌───── DistributedExec ── Tasks: t0:[p0] 
│ SortPreservingMergeExec: [number@0 ASC NULLS LAST]
│   [Stage 2] => NetworkCoalesceExec: output_partitions=48, input_tasks=3
└──────────────────────────────────────────────────
  ┌───── Stage 2 ── Tasks: t0:[p0..p15] t1:[p0..p15] t2:[p0..p15] 
  │ SortExec: expr=[number@0 ASC NULLS LAST], preserve_partitioning=[true]
  │   AggregateExec: mode=FinalPartitioned, gby=[number@0 as number], aggr=[]
  │     [Stage 1] => NetworkShuffleExec: output_partitions=16, input_tasks=4
  └──────────────────────────────────────────────────
    ┌───── Stage 1 ── Tasks: t0:[p0..p47] t1:[p0..p47] t2:[p0..p47] t3:[p0..p47] 
    │ RepartitionExec: partitioning=Hash([number@0], 48), input_partitions=16
    │   AggregateExec: mode=Partial, gby=[number@0 as number], aggr=[]
    │     RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=1
    │       CooperativeExec
    │           NumbersExec: t0:[0-25), t1:[25-50), t2:[50-75), t3:[75-100)
    └──────────────────────────────────────────────────
```

We do not get 100/10 = 10 distributed tasks, we just get 4. This is because the example is configured by default to
simulate a 4-worker cluster. If we increase the worker count, we get a highly distributed plan out with 10 tasks:

```bash
cargo run \
  --features integration \
  --example custom_execution_plan \
  "SELECT DISTINCT number FROM numbers(0, 100) ORDER BY number" \
  --workers 10 \
  --show-distributed-plan
```

```
┌───── DistributedExec ── Tasks: t0:[p0] 
│ SortPreservingMergeExec: [number@0 ASC NULLS LAST]
│   [Stage 2] => NetworkCoalesceExec: output_partitions=112, input_tasks=7
└──────────────────────────────────────────────────
  ┌───── Stage 2 ── Tasks: t0:[p0..p15] t1:[p0..p15] t2:[p0..p15] t3:[p0..p15] t4:[p0..p15] t5:[p0..p15] t6:[p0..p15] 
  │ SortExec: expr=[number@0 ASC NULLS LAST], preserve_partitioning=[true]
  │   AggregateExec: mode=FinalPartitioned, gby=[number@0 as number], aggr=[]
  │     [Stage 1] => NetworkShuffleExec: output_partitions=16, input_tasks=10
  └──────────────────────────────────────────────────
    ┌───── Stage 1 ── Tasks: t0:[p0..p111] t1:[p0..p111] t2:[p0..p111] t3:[p0..p111] t4:[p0..p111] t5:[p0..p111] t6:[p0..p111] t7:[p0..p111] t8:[p0..p111] t9:[p0..p111] 
    │ RepartitionExec: partitioning=Hash([number@0], 112), input_partitions=16
    │   AggregateExec: mode=Partial, gby=[number@0 as number], aggr=[]
    │     RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=1
    │       CooperativeExec
    │           NumbersExec: t0:[0-10), t1:[10-20), t2:[20-30), t3:[30-40), t4:[40-50), t5:[50-60), t6:[60-70), t7:[70-80), t8:[80-90), t9:[90-100)
    └──────────────────────────────────────────────────
```