datafusion_distributed/execution_plans/
partition_isolator.rs1use crate::DistributedTaskContext;
2use crate::common::require_one_child;
3use datafusion::execution::TaskContext;
4use datafusion::physical_expr_common::metrics::MetricsSet;
5use datafusion::physical_plan::ExecutionPlanProperties;
6use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder};
7use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
8use datafusion::{
9 error::Result,
10 execution::SendableRecordBatchStream,
11 physical_plan::{
12 DisplayAs, DisplayFormatType, EmptyRecordBatchStream, ExecutionPlan, Partitioning,
13 PlanProperties,
14 },
15};
16use futures::TryStreamExt;
17use std::{fmt::Formatter, sync::Arc};
18
19#[derive(Debug)]
55pub struct PartitionIsolatorExec {
56 pub(crate) input: Arc<dyn ExecutionPlan>,
57 pub(crate) properties: Arc<PlanProperties>,
58 pub(crate) n_tasks: usize,
59 pub(crate) metrics: ExecutionPlanMetricsSet,
60}
61
62impl PartitionIsolatorExec {
63 pub fn new(input: Arc<dyn ExecutionPlan>, n_tasks: usize) -> Self {
64 let input_partitions = input.properties().partitioning.partition_count();
65
66 let partition_count = Self::partition_groups(input_partitions, n_tasks)[0].len();
67
68 let properties = <PlanProperties as Clone>::clone(&input.properties().clone())
69 .with_partitioning(Partitioning::UnknownPartitioning(partition_count));
70
71 Self {
72 input: input.clone(),
73 properties: Arc::new(properties),
74 n_tasks,
75 metrics: ExecutionPlanMetricsSet::new(),
76 }
77 }
78
79 fn partition_groups(input_partitions: usize, n_tasks: usize) -> Vec<Vec<usize>> {
80 let q = input_partitions / n_tasks;
81 let r = input_partitions % n_tasks;
82
83 let mut off = 0;
84 (0..n_tasks)
85 .map(|i| q + if i < r { 1 } else { 0 })
86 .map(|n| {
87 let result = (off..(off + n)).collect();
88 off += n;
89 result
90 })
91 .collect()
92 }
93
94 pub(crate) fn partition_group(
95 input_partitions: usize,
96 task_i: usize,
97 n_tasks: usize,
98 ) -> Vec<usize> {
99 Self::partition_groups(input_partitions, n_tasks)[task_i].clone()
100 }
101}
102
103impl DisplayAs for PartitionIsolatorExec {
104 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
105 let input_partitions = self.input.output_partitioning().partition_count();
106 write!(f, "PartitionIsolatorExec: ")?;
107
108 match t {
109 DisplayFormatType::Verbose => {
110 let partition_groups = Self::partition_groups(input_partitions, self.n_tasks);
111
112 let n: usize = partition_groups.iter().map(|v| v.len()).sum();
113 let mut partitions = vec![];
114 for _ in 0..self.n_tasks {
115 partitions.push(vec!["__".to_string(); n]);
116 }
117
118 for (i, partition_group) in partition_groups.iter().enumerate() {
119 for (j, p) in partition_group.iter().enumerate() {
120 partitions[i][*p] = format!("p{j}")
121 }
122 write!(f, "t{i}:[{}] ", partitions[i].join(","))?;
123 }
124 }
125 _ => {
126 write!(f, "tasks={} partitions={}", self.n_tasks, input_partitions)?;
127 }
128 }
129 Ok(())
130 }
131}
132
133impl ExecutionPlan for PartitionIsolatorExec {
134 fn name(&self) -> &str {
135 "PartitionIsolatorExec"
136 }
137
138 fn as_any(&self) -> &dyn std::any::Any {
139 self
140 }
141
142 fn properties(&self) -> &Arc<PlanProperties> {
143 &self.properties
144 }
145
146 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
147 vec![&self.input]
148 }
149
150 fn with_new_children(
151 self: Arc<Self>,
152 children: Vec<Arc<dyn ExecutionPlan>>,
153 ) -> Result<Arc<dyn ExecutionPlan>> {
154 let input = require_one_child(children)?;
155 Ok(Arc::new(Self::new(input, self.n_tasks)))
156 }
157
158 fn execute(
159 &self,
160 partition: usize,
161 context: Arc<TaskContext>,
162 ) -> Result<SendableRecordBatchStream> {
163 let task_context = DistributedTaskContext::from_ctx(&context);
164
165 let metric = MetricBuilder::new(&self.metrics).output_rows(partition);
166 let input_partitions = self.input.output_partitioning().partition_count();
167
168 let partition_group = Self::partition_group(
169 input_partitions,
170 task_context.task_index,
171 task_context.task_count,
172 );
173
174 match partition_group.get(partition) {
179 Some(actual_partition_number) => {
180 if *actual_partition_number >= input_partitions {
181 Ok(Box::pin(EmptyRecordBatchStream::new(self.input.schema()))
183 as SendableRecordBatchStream)
184 } else {
185 Ok(Box::pin(RecordBatchStreamAdapter::new(
186 self.schema(),
187 self.input
188 .execute(*actual_partition_number, context)?
189 .inspect_ok(move |v| metric.add(v.num_rows())),
190 )))
191 }
192 }
193 None => Ok(Box::pin(EmptyRecordBatchStream::new(self.input.schema()))
194 as SendableRecordBatchStream),
195 }
196 }
197
198 fn metrics(&self) -> Option<MetricsSet> {
199 Some(self.metrics.clone_inner())
200 }
201}
202
203#[cfg(test)]
204mod tests {
205 use super::*;
206
207 #[test]
208 fn test_partition_groups() {
209 assert_eq!(
210 PartitionIsolatorExec::partition_groups(2, 1),
211 vec![vec![0, 1]]
212 );
213 assert_eq!(
214 PartitionIsolatorExec::partition_groups(6, 2),
215 vec![vec![0, 1, 2], vec![3, 4, 5]]
216 );
217 assert_eq!(
218 PartitionIsolatorExec::partition_groups(6, 3),
219 vec![vec![0, 1], vec![2, 3], vec![4, 5]]
220 );
221 assert_eq!(
222 PartitionIsolatorExec::partition_groups(6, 4),
223 vec![vec![0, 1], vec![2, 3], vec![4], vec![5]]
224 );
225 assert_eq!(
226 PartitionIsolatorExec::partition_groups(10, 3),
227 vec![vec![0, 1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]]
228 );
229 assert_eq!(
230 PartitionIsolatorExec::partition_groups(10, 4),
231 vec![vec![0, 1, 2], vec![3, 4, 5], vec![6, 7], vec![8, 9]]
232 );
233 }
234}