Skip to main content

datafusion_distributed/execution_plans/
distributed_leaf.rs

1use crate::DistributedTaskContext;
2use datafusion::common::{Result, Statistics, exec_err, not_impl_err, plan_err};
3use datafusion::execution::{SendableRecordBatchStream, TaskContext};
4use datafusion::physical_expr_common::metrics::MetricsSet;
5use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
6use std::fmt::Formatter;
7use std::sync::Arc;
8
9/// Represents a leaf node ready to be distributed across N tasks, where the variant of the node
10/// belonging to each task is stored in a `Vec` of N positions.
11///
12/// While sending this plan over the wire to a remote worker, only the appropriate variant is sent.
13///
14/// This [ExecutionPlan] implementation is typically returned by
15/// [crate::TaskEstimator::scale_up_leaf_node], which will be called for scaling up a node for
16/// distribution. The process typically looks like this:
17///
18/// 1. The distributed planner calls [crate::TaskEstimator::scale_up_leaf_node] providing a leaf
19///    node and the amount of tasks in which it should be distributed:
20///
21/// ```text
22/// ┌──────────────┐
23/// │DataSourceExec│ + 3 tasks
24/// └──────────────┘
25/// ```
26///
27/// 2. The [crate::TaskEstimator] implementation, either user provided or a default one, returns
28///    a [DistributedLeafExec] adhering to this task count:
29///
30/// ```text
31/// ┌────────────────────────────────────────────────┐
32/// │              DistributedLeafExec               │
33/// │                                                │
34/// │┌──────────────┐┌──────────────┐┌──────────────┐│
35/// ││DataSourceExec││DataSourceExec││DataSourceExec││
36/// ││  for task 0  ││  for task 1  ││  for task 2  ││
37/// │└──────────────┘└──────────────┘└──────────────┘│
38/// └────────────────────────────────────────────────┘
39/// ```
40///
41/// 3. The [crate::DistributedExec] node, upon being executed, will send the different variants of
42///    the leaf node to the respective workers, instead of sending the full [DistributedLeafExec]:
43///
44/// ```text
45/// ┌──────────────────┐┌──────────────────┐┌──────────────────┐
46/// │     Worker 0     ││     Worker 1     ││     Worker 2     │
47/// │                  ││                  ││                  │
48/// │       ...        ││       ...        ││       ...        │
49/// │                  ││                  ││                  │
50/// │ ┌──────────────┐ ││ ┌──────────────┐ ││ ┌──────────────┐ │
51/// │ │   SomeExec   │ ││ │   SomeExec   │ ││ │   SomeExec   │ │
52/// │ │              │ ││ │              │ ││ │              │ │
53/// │ └──────────────┘ ││ └──────────────┘ ││ └──────────────┘ │
54/// │ ┌──────────────┐ ││ ┌──────────────┐ ││ ┌──────────────┐ │
55/// │ │DataSourceExec│ ││ │DataSourceExec│ ││ │DataSourceExec│ │
56/// │ │  for task 0  │ ││ │  for task 1  │ ││ │  for task 2  │ │
57/// │ └──────────────┘ ││ └──────────────┘ ││ └──────────────┘ │
58/// └──────────────────┘└──────────────────┘└──────────────────┘
59/// ```
60///
61/// This way, the different workers get to execute different versions of the same plan, each
62/// handling its own range of non-overlapping data.
63#[derive(Debug)]
64pub struct DistributedLeafExec {
65    pub(crate) original: Arc<dyn ExecutionPlan>,
66    pub(crate) properties: Arc<PlanProperties>,
67    pub(crate) variants: Vec<Arc<dyn ExecutionPlan>>,
68}
69
70impl DistributedLeafExec {
71    /// Builds a new [DistributedLeafExec] based on the provided original plan and its per-task
72    /// variants. Provided variants must expose the same partition count as the original plan.
73    pub fn try_new(
74        original: Arc<dyn ExecutionPlan>,
75        variants: impl IntoIterator<Item = Arc<dyn ExecutionPlan>>,
76    ) -> Result<Self> {
77        let mut properties = None;
78        let variants = variants
79            .into_iter()
80            .map(|plan| {
81                let plan_properties = plan.properties();
82                let Some(prev) = &properties else {
83                    properties = Some(Arc::clone(plan_properties));
84                    return Ok(plan);
85                };
86                if prev.partitioning.partition_count()
87                    != plan_properties.partitioning.partition_count()
88                {
89                    return plan_err!("Different partition count where provided in two different variants of DistributedLeafExec")
90                }
91                if !prev.eq_properties.schema().eq(plan_properties.eq_properties.schema()) {
92                    return plan_err!("Different schemas where provided in two different variants of DistributedLeafExec")
93                }
94
95                Ok(plan)
96            })
97            .collect::<Result<Vec<_>>>()?;
98
99        let Some(properties) = properties else {
100            return plan_err!("Empty list of variants was provided to DistributedLeafExec");
101        };
102
103        Ok(Self {
104            original,
105            properties,
106            variants,
107        })
108    }
109
110    /// The plan this leaf was built from (the leaf passed to
111    /// [crate::TaskEstimator::scale_up_leaf_node]). Useful for recognising which `DistributedLeafExec`
112    /// you are looking at — e.g. by downcasting it to your own leaf type — before inspecting its
113    /// [DistributedLeafExec::variants].
114    pub fn original(&self) -> &Arc<dyn ExecutionPlan> {
115        &self.original
116    }
117
118    /// The per-task variants, in task order: `variants()[i]` is the plan sent to task `i`. Useful
119    /// for inspecting per-task information (e.g. data locality) when routing tasks to workers via
120    /// [crate::TaskEstimator::route_tasks].
121    pub fn variants(&self) -> &[Arc<dyn ExecutionPlan>] {
122        &self.variants
123    }
124
125    /// Returns the variant belonging to provided task index.
126    pub(crate) fn to_task_specialized(&self, task_i: usize) -> Arc<dyn ExecutionPlan> {
127        Arc::clone(&self.variants[task_i])
128    }
129}
130
131impl DisplayAs for DistributedLeafExec {
132    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
133        write!(f, "DistributedLeafExec: ")?;
134        self.original.fmt_as(t, f)
135    }
136}
137
138impl ExecutionPlan for DistributedLeafExec {
139    fn name(&self) -> &str {
140        "DistributedLeafExec"
141    }
142
143    fn properties(&self) -> &Arc<PlanProperties> {
144        &self.properties
145    }
146
147    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
148        vec![]
149    }
150
151    fn with_new_children(
152        self: Arc<Self>,
153        _children: Vec<Arc<dyn ExecutionPlan>>,
154    ) -> Result<Arc<dyn ExecutionPlan>> {
155        not_impl_err!("DistributedLeafExec does not accept children")
156    }
157
158    fn execute(
159        &self,
160        partition: usize,
161        context: Arc<TaskContext>,
162    ) -> Result<SendableRecordBatchStream> {
163        let d_ctx = DistributedTaskContext::from_ctx(&context);
164        if d_ctx.task_count == 1 {
165            return self.original.execute(partition, context);
166        }
167
168        let Some(plan) = self.variants.get(d_ctx.task_index) else {
169            return exec_err!(
170                "Task index {} out of range for a per_task vector of length {}",
171                d_ctx.task_index,
172                self.variants.len()
173            );
174        };
175
176        plan.execute(partition, context)
177    }
178
179    fn metrics(&self) -> Option<MetricsSet> {
180        self.original.metrics()
181    }
182
183    fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> {
184        self.original.partition_statistics(partition)
185    }
186}