# Custom Distributed Plan Example
Demonstrates how to **build your own distributed plan by injecting network boundaries directly**,
instead of relying on the automatic distributed planner to decide where the stages go.
See the [Custom Distributed Plans user guide](../docs/source/user-guide/custom-distributed-plans.md)
for the concepts.
## What it shows
`NetworkShuffleExec` and `NetworkCoalesceExec` are public, constructible nodes. If a physical plan
already contains network boundaries when it reaches the distributed planner, the planner does **not**
distribute it on its own — it just finalises the boundaries you placed and wraps the result in a
`DistributedExec`. The natural place to inject them is a `PhysicalOptimizerRule`.
This example builds a **progressive partial-reduction tree** for a `GROUP BY` aggregation over the
`weather` parquet table. Instead of gathering every leaf task into a single node with one wide coalesce,
it reduces the data at every level of the tree:
```text
Final (1 task) <- finishes the aggregation
NetworkCoalesceExec M -> 1
PartialReduce (M tasks) <- merges partial states, shrinking the data again
NetworkCoalesceExec N -> M
Partial (N tasks) <- first partial reduce, one task per slice of files
DistributedLeafExec(weather)
```
The key node is `AggregateExec(mode=PartialReduce)`: unlike a plain coalesce, which only concatenates
partition streams, `PartialReduce` merges partial-aggregate states into fewer partial-aggregate states,
so less data crosses each network hop. A single `Final` aggregation on the root finishes the job. This
works for stateful aggregates too — e.g. `avg(...)` merges its (sum, count) states correctly through
the tree.
## Components
**PartialReductionTreeRule** – a `PhysicalOptimizerRule` that matches the finalising aggregate of a
two-phase aggregation, reuses the group-by / aggregate expressions from the existing `Partial` node,
and rebuilds the pipeline as `Partial → NetworkCoalesce(N→M) → PartialReduce → NetworkCoalesce(M→1) → Final`.
**Leaf splitting (automatic)** – the rule leaves the parquet scan as a plain `DataSourceExec`. The
distributed planner runs the registered `TaskEstimator` over each stage's leaves, so the default
file-scan estimator wraps the leaf in a `DistributedLeafExec` and hands each leaf-stage task its own
slice of the parquet files — no manual leaf handling in the example.
## Usage
This example uses the in-memory cluster used in integration testing, so it needs `--features integration`.
Run it from the repo root so the `testdata/weather` files resolve.
```bash
cargo run \
--features integration \
--example custom_distributed_partial_reduction_tree \
'SELECT "RainToday", count(*) FROM weather GROUP BY "RainToday"' \
--show-distributed-plan
```
```
┌───── DistributedExec ── Tasks: t0:[p0]
│ ProjectionExec: expr=[RainToday@0 as RainToday, count(Int64(1))@1 as count(*)]
│ AggregateExec: mode=Final, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
│ CoalescePartitionsExec
│ [Stage 2] => NetworkCoalesceExec: output_partitions=2, input_tasks=2
└──────────────────────────────────────────────────
┌───── Stage 2 ── Tasks: t0:[p0] t1:[p1]
│ AggregateExec: mode=PartialReduce, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
│ CoalescePartitionsExec
│ [Stage 1] => NetworkCoalesceExec: output_partitions=6, input_tasks=3
└──────────────────────────────────────────────────
┌───── Stage 1 ── Tasks: t0:[p0..p2] t1:[p3..p5] t2:[p6..p8]
│ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
│ DistributedLeafExec: DataSourceExec: file_groups={3 groups: [[.../weather/result-000000.parquet], [.../weather/result-000001.parquet], [.../weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet
└──────────────────────────────────────────────────
```
Stage 1 is the first partial reduce on 3 tasks; note the `DistributedLeafExec` — the planner's
`TaskEstimator` wrapped the parquet scan automatically, even though we only injected the boundaries.
Stage 2 gathers 3 → 2 and partially reduces again; the root gathers 2 → 1 and finishes the aggregation.
Running it produces the answer:
```bash
cargo run --features integration --example custom_distributed_partial_reduction_tree \
'SELECT "RainToday", count(*) FROM weather GROUP BY "RainToday"'
```
```
+-----------+----------+