datafusion_physical_plan/
coalesce_partitions.rs1use std::any::Any;
22use std::sync::Arc;
23
24use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
25use super::stream::{ObservedStream, RecordBatchReceiverStream};
26use super::{
27 DisplayAs, ExecutionPlanProperties, PlanProperties, SendableRecordBatchStream,
28 Statistics,
29};
30use crate::execution_plan::{CardinalityEffect, EvaluationType, SchedulingType};
31use crate::filter_pushdown::{FilterDescription, FilterPushdownPhase};
32use crate::projection::{ProjectionExec, make_with_child};
33use crate::sort_pushdown::SortOrderPushdownResult;
34use crate::{DisplayFormatType, ExecutionPlan, Partitioning};
35use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
36
37use datafusion_common::config::ConfigOptions;
38use datafusion_common::{Result, assert_eq_or_internal_err, internal_err};
39use datafusion_execution::TaskContext;
40use datafusion_physical_expr::PhysicalExpr;
41
42#[derive(Debug, Clone)]
45pub struct CoalescePartitionsExec {
46 input: Arc<dyn ExecutionPlan>,
48 metrics: ExecutionPlanMetricsSet,
50 cache: PlanProperties,
51 pub(crate) fetch: Option<usize>,
53}
54
55impl CoalescePartitionsExec {
56 pub fn new(input: Arc<dyn ExecutionPlan>) -> Self {
58 let cache = Self::compute_properties(&input);
59 CoalescePartitionsExec {
60 input,
61 metrics: ExecutionPlanMetricsSet::new(),
62 cache,
63 fetch: None,
64 }
65 }
66
67 pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
69 self.fetch = fetch;
70 self
71 }
72
73 pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
75 &self.input
76 }
77
78 fn compute_properties(input: &Arc<dyn ExecutionPlan>) -> PlanProperties {
80 let input_partitions = input.output_partitioning().partition_count();
81 let (drive, scheduling) = if input_partitions > 1 {
82 (EvaluationType::Eager, SchedulingType::Cooperative)
83 } else {
84 (
85 input.properties().evaluation_type,
86 input.properties().scheduling_type,
87 )
88 };
89
90 let mut eq_properties = input.equivalence_properties().clone();
92 eq_properties.clear_orderings();
93 eq_properties.clear_per_partition_constants();
94 PlanProperties::new(
95 eq_properties, Partitioning::UnknownPartitioning(1), input.pipeline_behavior(),
98 input.boundedness(),
99 )
100 .with_evaluation_type(drive)
101 .with_scheduling_type(scheduling)
102 }
103}
104
105impl DisplayAs for CoalescePartitionsExec {
106 fn fmt_as(
107 &self,
108 t: DisplayFormatType,
109 f: &mut std::fmt::Formatter,
110 ) -> std::fmt::Result {
111 match t {
112 DisplayFormatType::Default | DisplayFormatType::Verbose => match self.fetch {
113 Some(fetch) => {
114 write!(f, "CoalescePartitionsExec: fetch={fetch}")
115 }
116 None => write!(f, "CoalescePartitionsExec"),
117 },
118 DisplayFormatType::TreeRender => match self.fetch {
119 Some(fetch) => {
120 write!(f, "limit: {fetch}")
121 }
122 None => write!(f, ""),
123 },
124 }
125 }
126}
127
128impl ExecutionPlan for CoalescePartitionsExec {
129 fn name(&self) -> &'static str {
130 "CoalescePartitionsExec"
131 }
132
133 fn as_any(&self) -> &dyn Any {
135 self
136 }
137
138 fn properties(&self) -> &PlanProperties {
139 &self.cache
140 }
141
142 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
143 vec![&self.input]
144 }
145
146 fn benefits_from_input_partitioning(&self) -> Vec<bool> {
147 vec![false]
148 }
149
150 fn with_new_children(
151 self: Arc<Self>,
152 children: Vec<Arc<dyn ExecutionPlan>>,
153 ) -> Result<Arc<dyn ExecutionPlan>> {
154 let mut plan = CoalescePartitionsExec::new(Arc::clone(&children[0]));
155 plan.fetch = self.fetch;
156 Ok(Arc::new(plan))
157 }
158
159 fn execute(
160 &self,
161 partition: usize,
162 context: Arc<TaskContext>,
163 ) -> Result<SendableRecordBatchStream> {
164 assert_eq_or_internal_err!(
166 partition,
167 0,
168 "CoalescePartitionsExec invalid partition {partition}"
169 );
170
171 let input_partitions = self.input.output_partitioning().partition_count();
172 match input_partitions {
173 0 => internal_err!(
174 "CoalescePartitionsExec requires at least one input partition"
175 ),
176 1 => {
177 let child_stream = self.input.execute(0, context)?;
180 if self.fetch.is_some() {
181 let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
182 return Ok(Box::pin(ObservedStream::new(
183 child_stream,
184 baseline_metrics,
185 self.fetch,
186 )));
187 }
188 Ok(child_stream)
189 }
190 _ => {
191 let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
192 let elapsed_compute = baseline_metrics.elapsed_compute().clone();
195 let _timer = elapsed_compute.timer();
196
197 let mut builder =
201 RecordBatchReceiverStream::builder(self.schema(), input_partitions);
202
203 for part_i in 0..input_partitions {
206 builder.run_input(
207 Arc::clone(&self.input),
208 part_i,
209 Arc::clone(&context),
210 );
211 }
212
213 let stream = builder.build();
214 Ok(Box::pin(ObservedStream::new(
215 stream,
216 baseline_metrics,
217 self.fetch,
218 )))
219 }
220 }
221 }
222
223 fn metrics(&self) -> Option<MetricsSet> {
224 Some(self.metrics.clone_inner())
225 }
226
227 fn statistics(&self) -> Result<Statistics> {
228 self.partition_statistics(None)
229 }
230
231 fn partition_statistics(&self, _partition: Option<usize>) -> Result<Statistics> {
232 self.input
233 .partition_statistics(None)?
234 .with_fetch(self.fetch, 0, 1)
235 }
236
237 fn supports_limit_pushdown(&self) -> bool {
238 true
239 }
240
241 fn cardinality_effect(&self) -> CardinalityEffect {
242 CardinalityEffect::Equal
243 }
244
245 fn try_swapping_with_projection(
249 &self,
250 projection: &ProjectionExec,
251 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
252 if projection.expr().len() >= projection.input().schema().fields().len() {
254 return Ok(None);
255 }
256 make_with_child(projection, projection.input().children()[0]).map(|e| {
258 if self.fetch.is_some() {
259 let mut plan = CoalescePartitionsExec::new(e);
260 plan.fetch = self.fetch;
261 Some(Arc::new(plan) as _)
262 } else {
263 Some(Arc::new(CoalescePartitionsExec::new(e)) as _)
264 }
265 })
266 }
267
268 fn fetch(&self) -> Option<usize> {
269 self.fetch
270 }
271
272 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
273 Some(Arc::new(CoalescePartitionsExec {
274 input: Arc::clone(&self.input),
275 fetch: limit,
276 metrics: self.metrics.clone(),
277 cache: self.cache.clone(),
278 }))
279 }
280
281 fn gather_filters_for_pushdown(
282 &self,
283 _phase: FilterPushdownPhase,
284 parent_filters: Vec<Arc<dyn PhysicalExpr>>,
285 _config: &ConfigOptions,
286 ) -> Result<FilterDescription> {
287 FilterDescription::from_children(parent_filters, &self.children())
288 }
289
290 fn try_pushdown_sort(
291 &self,
292 order: &[PhysicalSortExpr],
293 ) -> Result<SortOrderPushdownResult<Arc<dyn ExecutionPlan>>> {
294 let result = self.input.try_pushdown_sort(order)?;
302
303 let has_multiple_partitions =
306 self.input.output_partitioning().partition_count() > 1;
307
308 result
309 .try_map(|new_input| {
310 Ok(
311 Arc::new(
312 CoalescePartitionsExec::new(new_input).with_fetch(self.fetch),
313 ) as Arc<dyn ExecutionPlan>,
314 )
315 })
316 .map(|r| {
317 if has_multiple_partitions {
318 r.into_inexact()
320 } else {
321 r
322 }
323 })
324 }
325}
326
327#[cfg(test)]
328mod tests {
329 use super::*;
330 use crate::test::exec::{
331 BlockingExec, PanicExec, assert_strong_count_converges_to_zero,
332 };
333 use crate::test::{self, assert_is_pending};
334 use crate::{collect, common};
335
336 use arrow::datatypes::{DataType, Field, Schema};
337
338 use futures::FutureExt;
339
340 #[tokio::test]
341 async fn merge() -> Result<()> {
342 let task_ctx = Arc::new(TaskContext::default());
343
344 let num_partitions = 4;
345 let csv = test::scan_partitioned(num_partitions);
346
347 assert_eq!(csv.output_partitioning().partition_count(), num_partitions);
349
350 let merge = CoalescePartitionsExec::new(csv);
351
352 assert_eq!(
354 merge.properties().output_partitioning().partition_count(),
355 1
356 );
357
358 let iter = merge.execute(0, task_ctx)?;
360 let batches = common::collect(iter).await?;
361 assert_eq!(batches.len(), num_partitions);
362
363 let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
365 assert_eq!(row_count, 400);
366
367 Ok(())
368 }
369
370 #[tokio::test]
371 async fn test_drop_cancel() -> Result<()> {
372 let task_ctx = Arc::new(TaskContext::default());
373 let schema =
374 Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));
375
376 let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 2));
377 let refs = blocking_exec.refs();
378 let coalesce_partitions_exec =
379 Arc::new(CoalescePartitionsExec::new(blocking_exec));
380
381 let fut = collect(coalesce_partitions_exec, task_ctx);
382 let mut fut = fut.boxed();
383
384 assert_is_pending(&mut fut);
385 drop(fut);
386 assert_strong_count_converges_to_zero(refs).await;
387
388 Ok(())
389 }
390
391 #[tokio::test]
392 #[should_panic(expected = "PanickingStream did panic")]
393 async fn test_panic() {
394 let task_ctx = Arc::new(TaskContext::default());
395 let schema =
396 Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));
397
398 let panicking_exec = Arc::new(PanicExec::new(Arc::clone(&schema), 2));
399 let coalesce_partitions_exec =
400 Arc::new(CoalescePartitionsExec::new(panicking_exec));
401
402 collect(coalesce_partitions_exec, task_ctx).await.unwrap();
403 }
404
405 #[tokio::test]
406 async fn test_single_partition_with_fetch() -> Result<()> {
407 let task_ctx = Arc::new(TaskContext::default());
408
409 let input = test::scan_partitioned(1);
411
412 let coalesce = CoalescePartitionsExec::new(input).with_fetch(Some(3));
414
415 let stream = coalesce.execute(0, task_ctx)?;
416 let batches = common::collect(stream).await?;
417
418 let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
419 assert_eq!(row_count, 3, "Should only return 3 rows due to fetch=3");
420
421 Ok(())
422 }
423
424 #[tokio::test]
425 async fn test_multi_partition_with_fetch_one() -> Result<()> {
426 let task_ctx = Arc::new(TaskContext::default());
427
428 let input = test::scan_partitioned(4);
431
432 let coalesce = CoalescePartitionsExec::new(input).with_fetch(Some(1));
434
435 let stream = coalesce.execute(0, task_ctx)?;
436 let batches = common::collect(stream).await?;
437
438 let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
439 assert_eq!(
440 row_count, 1,
441 "Should only return 1 row due to fetch=1, not one per partition"
442 );
443
444 Ok(())
445 }
446
447 #[tokio::test]
448 async fn test_single_partition_without_fetch() -> Result<()> {
449 let task_ctx = Arc::new(TaskContext::default());
450
451 let input = test::scan_partitioned(1);
453
454 let coalesce = CoalescePartitionsExec::new(input);
456
457 let stream = coalesce.execute(0, task_ctx)?;
458 let batches = common::collect(stream).await?;
459
460 let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
461 assert_eq!(
462 row_count, 100,
463 "Should return all 100 rows when fetch is None"
464 );
465
466 Ok(())
467 }
468
469 #[tokio::test]
470 async fn test_single_partition_fetch_larger_than_batch() -> Result<()> {
471 let task_ctx = Arc::new(TaskContext::default());
472
473 let input = test::scan_partitioned(1);
475
476 let coalesce = CoalescePartitionsExec::new(input).with_fetch(Some(200));
478
479 let stream = coalesce.execute(0, task_ctx)?;
480 let batches = common::collect(stream).await?;
481
482 let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
483 assert_eq!(
484 row_count, 100,
485 "Should return all available rows (100) when fetch (200) is larger"
486 );
487
488 Ok(())
489 }
490
491 #[tokio::test]
492 async fn test_multi_partition_fetch_exact_match() -> Result<()> {
493 let task_ctx = Arc::new(TaskContext::default());
494
495 let num_partitions = 4;
497 let csv = test::scan_partitioned(num_partitions);
498
499 let coalesce = CoalescePartitionsExec::new(csv).with_fetch(Some(400));
501
502 let stream = coalesce.execute(0, task_ctx)?;
503 let batches = common::collect(stream).await?;
504
505 let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
506 assert_eq!(row_count, 400, "Should return exactly 400 rows");
507
508 Ok(())
509 }
510}