datafusion_distributed/execution_plans/distributed_leaf.rs
1use crate::DistributedTaskContext;
2use datafusion::common::{Result, Statistics, exec_err, not_impl_err, plan_err};
3use datafusion::execution::{SendableRecordBatchStream, TaskContext};
4use datafusion::physical_expr_common::metrics::MetricsSet;
5use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
6use std::fmt::Formatter;
7use std::sync::Arc;
8
9/// Represents a leaf node ready to be distributed across N tasks, where the variant of the node
10/// belonging to each task is stored in a `Vec` of N positions.
11///
12/// While sending this plan over the wire to a remote worker, only the appropriate variant is sent.
13///
14/// This [ExecutionPlan] implementation is typically returned by
15/// [crate::TaskEstimator::scale_up_leaf_node], which will be called for scaling up a node for
16/// distribution. The process typically looks like this:
17///
18/// 1. The distributed planner calls [crate::TaskEstimator::scale_up_leaf_node] providing a leaf
19/// node and the amount of tasks in which it should be distributed:
20///
21/// ```text
22/// ┌──────────────┐
23/// │DataSourceExec│ + 3 tasks
24/// └──────────────┘
25/// ```
26///
27/// 2. The [crate::TaskEstimator] implementation, either user provided or a default one, returns
28/// a [DistributedLeafExec] adhering to this task count:
29///
30/// ```text
31/// ┌────────────────────────────────────────────────┐
32/// │ DistributedLeafExec │
33/// │ │
34/// │┌──────────────┐┌──────────────┐┌──────────────┐│
35/// ││DataSourceExec││DataSourceExec││DataSourceExec││
36/// ││ for task 0 ││ for task 1 ││ for task 2 ││
37/// │└──────────────┘└──────────────┘└──────────────┘│
38/// └────────────────────────────────────────────────┘
39/// ```
40///
41/// 3. The [crate::DistributedExec] node, upon being executed, will send the different variants of
42/// the leaf node to the respective workers, instead of sending the full [DistributedLeafExec]:
43///
44/// ```text
45/// ┌──────────────────┐┌──────────────────┐┌──────────────────┐
46/// │ Worker 0 ││ Worker 1 ││ Worker 2 │
47/// │ ││ ││ │
48/// │ ... ││ ... ││ ... │
49/// │ ││ ││ │
50/// │ ┌──────────────┐ ││ ┌──────────────┐ ││ ┌──────────────┐ │
51/// │ │ SomeExec │ ││ │ SomeExec │ ││ │ SomeExec │ │
52/// │ │ │ ││ │ │ ││ │ │ │
53/// │ └──────────────┘ ││ └──────────────┘ ││ └──────────────┘ │
54/// │ ┌──────────────┐ ││ ┌──────────────┐ ││ ┌──────────────┐ │
55/// │ │DataSourceExec│ ││ │DataSourceExec│ ││ │DataSourceExec│ │
56/// │ │ for task 0 │ ││ │ for task 1 │ ││ │ for task 2 │ │
57/// │ └──────────────┘ ││ └──────────────┘ ││ └──────────────┘ │
58/// └──────────────────┘└──────────────────┘└──────────────────┘
59/// ```
60///
61/// This way, the different workers get to execute different versions of the same plan, each
62/// handling its own range of non-overlapping data.
63#[derive(Debug)]
64pub struct DistributedLeafExec {
65 pub(crate) original: Arc<dyn ExecutionPlan>,
66 pub(crate) properties: Arc<PlanProperties>,
67 pub(crate) variants: Vec<Arc<dyn ExecutionPlan>>,
68}
69
70impl DistributedLeafExec {
71 /// Builds a new [DistributedLeafExec] based on the provided original plan and its per-task
72 /// variants. Provided variants must expose the same partition count as the original plan.
73 pub fn try_new(
74 original: Arc<dyn ExecutionPlan>,
75 variants: impl IntoIterator<Item = Arc<dyn ExecutionPlan>>,
76 ) -> Result<Self> {
77 let mut properties = None;
78 let variants = variants
79 .into_iter()
80 .map(|plan| {
81 let plan_properties = plan.properties();
82 let Some(prev) = &properties else {
83 properties = Some(Arc::clone(plan_properties));
84 return Ok(plan);
85 };
86 if prev.partitioning.partition_count()
87 != plan_properties.partitioning.partition_count()
88 {
89 return plan_err!("Different partition count where provided in two different variants of DistributedLeafExec")
90 }
91 if !prev.eq_properties.schema().eq(plan_properties.eq_properties.schema()) {
92 return plan_err!("Different schemas where provided in two different variants of DistributedLeafExec")
93 }
94
95 Ok(plan)
96 })
97 .collect::<Result<Vec<_>>>()?;
98
99 let Some(properties) = properties else {
100 return plan_err!("Empty list of variants was provided to DistributedLeafExec");
101 };
102
103 Ok(Self {
104 original,
105 properties,
106 variants,
107 })
108 }
109
110 /// The plan this leaf was built from (the leaf passed to
111 /// [crate::TaskEstimator::scale_up_leaf_node]). Useful for recognising which `DistributedLeafExec`
112 /// you are looking at — e.g. by downcasting it to your own leaf type — before inspecting its
113 /// [DistributedLeafExec::variants].
114 pub fn original(&self) -> &Arc<dyn ExecutionPlan> {
115 &self.original
116 }
117
118 /// The per-task variants, in task order: `variants()[i]` is the plan sent to task `i`. Useful
119 /// for inspecting per-task information (e.g. data locality) when routing tasks to workers via
120 /// [crate::TaskEstimator::route_tasks].
121 pub fn variants(&self) -> &[Arc<dyn ExecutionPlan>] {
122 &self.variants
123 }
124
125 /// Returns the variant belonging to provided task index.
126 pub(crate) fn to_task_specialized(&self, task_i: usize) -> Arc<dyn ExecutionPlan> {
127 Arc::clone(&self.variants[task_i])
128 }
129}
130
131impl DisplayAs for DistributedLeafExec {
132 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
133 write!(f, "DistributedLeafExec: ")?;
134 self.original.fmt_as(t, f)
135 }
136}
137
138impl ExecutionPlan for DistributedLeafExec {
139 fn name(&self) -> &str {
140 "DistributedLeafExec"
141 }
142
143 fn properties(&self) -> &Arc<PlanProperties> {
144 &self.properties
145 }
146
147 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
148 vec![]
149 }
150
151 fn with_new_children(
152 self: Arc<Self>,
153 _children: Vec<Arc<dyn ExecutionPlan>>,
154 ) -> Result<Arc<dyn ExecutionPlan>> {
155 not_impl_err!("DistributedLeafExec does not accept children")
156 }
157
158 fn execute(
159 &self,
160 partition: usize,
161 context: Arc<TaskContext>,
162 ) -> Result<SendableRecordBatchStream> {
163 let d_ctx = DistributedTaskContext::from_ctx(&context);
164 if d_ctx.task_count == 1 {
165 return self.original.execute(partition, context);
166 }
167
168 let Some(plan) = self.variants.get(d_ctx.task_index) else {
169 return exec_err!(
170 "Task index {} out of range for a per_task vector of length {}",
171 d_ctx.task_index,
172 self.variants.len()
173 );
174 };
175
176 plan.execute(partition, context)
177 }
178
179 fn metrics(&self) -> Option<MetricsSet> {
180 self.original.metrics()
181 }
182
183 fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> {
184 self.original.partition_statistics(partition)
185 }
186}