datafusion_physical_plan/
coalesce_partitions.rs1use std::any::Any;
22use std::sync::Arc;
23
24use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
25use super::stream::{ObservedStream, RecordBatchReceiverStream};
26use super::{
27 DisplayAs, ExecutionPlanProperties, PlanProperties, SendableRecordBatchStream,
28 Statistics,
29};
30use crate::execution_plan::{CardinalityEffect, EvaluationType, SchedulingType};
31use crate::projection::{make_with_child, ProjectionExec};
32use crate::{DisplayFormatType, ExecutionPlan, Partitioning};
33
34use datafusion_common::{internal_err, Result};
35use datafusion_execution::TaskContext;
36
37#[derive(Debug, Clone)]
40pub struct CoalescePartitionsExec {
41 input: Arc<dyn ExecutionPlan>,
43 metrics: ExecutionPlanMetricsSet,
45 cache: PlanProperties,
46 pub(crate) fetch: Option<usize>,
48}
49
50impl CoalescePartitionsExec {
51 pub fn new(input: Arc<dyn ExecutionPlan>) -> Self {
53 let cache = Self::compute_properties(&input);
54 CoalescePartitionsExec {
55 input,
56 metrics: ExecutionPlanMetricsSet::new(),
57 cache,
58 fetch: None,
59 }
60 }
61
62 pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
64 self.fetch = fetch;
65 self
66 }
67
68 pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
70 &self.input
71 }
72
73 fn compute_properties(input: &Arc<dyn ExecutionPlan>) -> PlanProperties {
75 let input_partitions = input.output_partitioning().partition_count();
76 let (drive, scheduling) = if input_partitions > 1 {
77 (EvaluationType::Eager, SchedulingType::Cooperative)
78 } else {
79 (
80 input.properties().evaluation_type,
81 input.properties().scheduling_type,
82 )
83 };
84
85 let mut eq_properties = input.equivalence_properties().clone();
87 eq_properties.clear_orderings();
88 eq_properties.clear_per_partition_constants();
89 PlanProperties::new(
90 eq_properties, Partitioning::UnknownPartitioning(1), input.pipeline_behavior(),
93 input.boundedness(),
94 )
95 .with_evaluation_type(drive)
96 .with_scheduling_type(scheduling)
97 }
98}
99
100impl DisplayAs for CoalescePartitionsExec {
101 fn fmt_as(
102 &self,
103 t: DisplayFormatType,
104 f: &mut std::fmt::Formatter,
105 ) -> std::fmt::Result {
106 match t {
107 DisplayFormatType::Default | DisplayFormatType::Verbose => match self.fetch {
108 Some(fetch) => {
109 write!(f, "CoalescePartitionsExec: fetch={fetch}")
110 }
111 None => write!(f, "CoalescePartitionsExec"),
112 },
113 DisplayFormatType::TreeRender => match self.fetch {
114 Some(fetch) => {
115 write!(f, "limit: {fetch}")
116 }
117 None => write!(f, ""),
118 },
119 }
120 }
121}
122
123impl ExecutionPlan for CoalescePartitionsExec {
124 fn name(&self) -> &'static str {
125 "CoalescePartitionsExec"
126 }
127
128 fn as_any(&self) -> &dyn Any {
130 self
131 }
132
133 fn properties(&self) -> &PlanProperties {
134 &self.cache
135 }
136
137 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
138 vec![&self.input]
139 }
140
141 fn benefits_from_input_partitioning(&self) -> Vec<bool> {
142 vec![false]
143 }
144
145 fn with_new_children(
146 self: Arc<Self>,
147 children: Vec<Arc<dyn ExecutionPlan>>,
148 ) -> Result<Arc<dyn ExecutionPlan>> {
149 let mut plan = CoalescePartitionsExec::new(Arc::clone(&children[0]));
150 plan.fetch = self.fetch;
151 Ok(Arc::new(plan))
152 }
153
154 fn execute(
155 &self,
156 partition: usize,
157 context: Arc<TaskContext>,
158 ) -> Result<SendableRecordBatchStream> {
159 if 0 != partition {
161 return internal_err!("CoalescePartitionsExec invalid partition {partition}");
162 }
163
164 let input_partitions = self.input.output_partitioning().partition_count();
165 match input_partitions {
166 0 => internal_err!(
167 "CoalescePartitionsExec requires at least one input partition"
168 ),
169 1 => {
170 self.input.execute(0, context)
172 }
173 _ => {
174 let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
175 let elapsed_compute = baseline_metrics.elapsed_compute().clone();
178 let _timer = elapsed_compute.timer();
179
180 let mut builder =
184 RecordBatchReceiverStream::builder(self.schema(), input_partitions);
185
186 for part_i in 0..input_partitions {
189 builder.run_input(
190 Arc::clone(&self.input),
191 part_i,
192 Arc::clone(&context),
193 );
194 }
195
196 let stream = builder.build();
197 Ok(Box::pin(ObservedStream::new(
198 stream,
199 baseline_metrics,
200 self.fetch,
201 )))
202 }
203 }
204 }
205
206 fn metrics(&self) -> Option<MetricsSet> {
207 Some(self.metrics.clone_inner())
208 }
209
210 fn statistics(&self) -> Result<Statistics> {
211 self.partition_statistics(None)
212 }
213
214 fn partition_statistics(&self, _partition: Option<usize>) -> Result<Statistics> {
215 self.input
216 .partition_statistics(None)?
217 .with_fetch(self.schema(), self.fetch, 0, 1)
218 }
219
220 fn supports_limit_pushdown(&self) -> bool {
221 true
222 }
223
224 fn cardinality_effect(&self) -> CardinalityEffect {
225 CardinalityEffect::Equal
226 }
227
228 fn try_swapping_with_projection(
232 &self,
233 projection: &ProjectionExec,
234 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
235 if projection.expr().len() >= projection.input().schema().fields().len() {
237 return Ok(None);
238 }
239 make_with_child(projection, projection.input().children()[0]).map(|e| {
241 if self.fetch.is_some() {
242 let mut plan = CoalescePartitionsExec::new(e);
243 plan.fetch = self.fetch;
244 Some(Arc::new(plan) as _)
245 } else {
246 Some(Arc::new(CoalescePartitionsExec::new(e)) as _)
247 }
248 })
249 }
250
251 fn fetch(&self) -> Option<usize> {
252 self.fetch
253 }
254
255 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
256 Some(Arc::new(CoalescePartitionsExec {
257 input: Arc::clone(&self.input),
258 fetch: limit,
259 metrics: self.metrics.clone(),
260 cache: self.cache.clone(),
261 }))
262 }
263}
264
265#[cfg(test)]
266mod tests {
267 use super::*;
268 use crate::test::exec::{
269 assert_strong_count_converges_to_zero, BlockingExec, PanicExec,
270 };
271 use crate::test::{self, assert_is_pending};
272 use crate::{collect, common};
273
274 use arrow::datatypes::{DataType, Field, Schema};
275
276 use futures::FutureExt;
277
278 #[tokio::test]
279 async fn merge() -> Result<()> {
280 let task_ctx = Arc::new(TaskContext::default());
281
282 let num_partitions = 4;
283 let csv = test::scan_partitioned(num_partitions);
284
285 assert_eq!(csv.output_partitioning().partition_count(), num_partitions);
287
288 let merge = CoalescePartitionsExec::new(csv);
289
290 assert_eq!(
292 merge.properties().output_partitioning().partition_count(),
293 1
294 );
295
296 let iter = merge.execute(0, task_ctx)?;
298 let batches = common::collect(iter).await?;
299 assert_eq!(batches.len(), num_partitions);
300
301 let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
303 assert_eq!(row_count, 400);
304
305 Ok(())
306 }
307
308 #[tokio::test]
309 async fn test_drop_cancel() -> Result<()> {
310 let task_ctx = Arc::new(TaskContext::default());
311 let schema =
312 Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));
313
314 let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 2));
315 let refs = blocking_exec.refs();
316 let coalesce_partitions_exec =
317 Arc::new(CoalescePartitionsExec::new(blocking_exec));
318
319 let fut = collect(coalesce_partitions_exec, task_ctx);
320 let mut fut = fut.boxed();
321
322 assert_is_pending(&mut fut);
323 drop(fut);
324 assert_strong_count_converges_to_zero(refs).await;
325
326 Ok(())
327 }
328
329 #[tokio::test]
330 #[should_panic(expected = "PanickingStream did panic")]
331 async fn test_panic() {
332 let task_ctx = Arc::new(TaskContext::default());
333 let schema =
334 Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));
335
336 let panicking_exec = Arc::new(PanicExec::new(Arc::clone(&schema), 2));
337 let coalesce_partitions_exec =
338 Arc::new(CoalescePartitionsExec::new(panicking_exec));
339
340 collect(coalesce_partitions_exec, task_ctx).await.unwrap();
341 }
342}