datafusion_physical_plan/
coalesce_partitions.rs1use std::any::Any;
22use std::sync::Arc;
23
24use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
25use super::stream::{ObservedStream, RecordBatchReceiverStream};
26use super::{
27 DisplayAs, ExecutionPlanProperties, PlanProperties, SendableRecordBatchStream,
28 Statistics,
29};
30use crate::execution_plan::{CardinalityEffect, EvaluationType, SchedulingType};
31use crate::filter_pushdown::{FilterDescription, FilterPushdownPhase};
32use crate::projection::{ProjectionExec, make_with_child};
33use crate::sort_pushdown::SortOrderPushdownResult;
34use crate::{DisplayFormatType, ExecutionPlan, Partitioning, check_if_same_properties};
35use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
36
37use datafusion_common::config::ConfigOptions;
38use datafusion_common::{Result, assert_eq_or_internal_err, internal_err};
39use datafusion_execution::TaskContext;
40use datafusion_physical_expr::PhysicalExpr;
41
42#[derive(Debug, Clone)]
45pub struct CoalescePartitionsExec {
46 input: Arc<dyn ExecutionPlan>,
48 metrics: ExecutionPlanMetricsSet,
50 cache: Arc<PlanProperties>,
51 pub(crate) fetch: Option<usize>,
53}
54
55impl CoalescePartitionsExec {
56 pub fn new(input: Arc<dyn ExecutionPlan>) -> Self {
58 let cache = Self::compute_properties(&input);
59 CoalescePartitionsExec {
60 input,
61 metrics: ExecutionPlanMetricsSet::new(),
62 cache: Arc::new(cache),
63 fetch: None,
64 }
65 }
66
67 pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
69 self.fetch = fetch;
70 self
71 }
72
73 pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
75 &self.input
76 }
77
78 fn compute_properties(input: &Arc<dyn ExecutionPlan>) -> PlanProperties {
80 let input_partitions = input.output_partitioning().partition_count();
81 let (drive, scheduling) = if input_partitions > 1 {
82 (EvaluationType::Eager, SchedulingType::Cooperative)
83 } else {
84 (
85 input.properties().evaluation_type,
86 input.properties().scheduling_type,
87 )
88 };
89
90 let mut eq_properties = input.equivalence_properties().clone();
92 eq_properties.clear_orderings();
93 eq_properties.clear_per_partition_constants();
94 PlanProperties::new(
95 eq_properties, Partitioning::UnknownPartitioning(1), input.pipeline_behavior(),
98 input.boundedness(),
99 )
100 .with_evaluation_type(drive)
101 .with_scheduling_type(scheduling)
102 }
103
104 fn with_new_children_and_same_properties(
105 &self,
106 mut children: Vec<Arc<dyn ExecutionPlan>>,
107 ) -> Self {
108 Self {
109 input: children.swap_remove(0),
110 metrics: ExecutionPlanMetricsSet::new(),
111 ..Self::clone(self)
112 }
113 }
114}
115
116impl DisplayAs for CoalescePartitionsExec {
117 fn fmt_as(
118 &self,
119 t: DisplayFormatType,
120 f: &mut std::fmt::Formatter,
121 ) -> std::fmt::Result {
122 match t {
123 DisplayFormatType::Default | DisplayFormatType::Verbose => match self.fetch {
124 Some(fetch) => {
125 write!(f, "CoalescePartitionsExec: fetch={fetch}")
126 }
127 None => write!(f, "CoalescePartitionsExec"),
128 },
129 DisplayFormatType::TreeRender => match self.fetch {
130 Some(fetch) => {
131 write!(f, "limit: {fetch}")
132 }
133 None => write!(f, ""),
134 },
135 }
136 }
137}
138
139impl ExecutionPlan for CoalescePartitionsExec {
140 fn name(&self) -> &'static str {
141 "CoalescePartitionsExec"
142 }
143
144 fn as_any(&self) -> &dyn Any {
146 self
147 }
148
149 fn properties(&self) -> &Arc<PlanProperties> {
150 &self.cache
151 }
152
153 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
154 vec![&self.input]
155 }
156
157 fn benefits_from_input_partitioning(&self) -> Vec<bool> {
158 vec![false]
159 }
160
161 fn with_new_children(
162 self: Arc<Self>,
163 mut children: Vec<Arc<dyn ExecutionPlan>>,
164 ) -> Result<Arc<dyn ExecutionPlan>> {
165 check_if_same_properties!(self, children);
166 let mut plan = CoalescePartitionsExec::new(children.swap_remove(0));
167 plan.fetch = self.fetch;
168 Ok(Arc::new(plan))
169 }
170
171 fn execute(
172 &self,
173 partition: usize,
174 context: Arc<TaskContext>,
175 ) -> Result<SendableRecordBatchStream> {
176 assert_eq_or_internal_err!(
178 partition,
179 0,
180 "CoalescePartitionsExec invalid partition {partition}"
181 );
182
183 let input_partitions = self.input.output_partitioning().partition_count();
184 match input_partitions {
185 0 => internal_err!(
186 "CoalescePartitionsExec requires at least one input partition"
187 ),
188 1 => {
189 let child_stream = self.input.execute(0, context)?;
192 if self.fetch.is_some() {
193 let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
194 return Ok(Box::pin(ObservedStream::new(
195 child_stream,
196 baseline_metrics,
197 self.fetch,
198 )));
199 }
200 Ok(child_stream)
201 }
202 _ => {
203 let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
204 let elapsed_compute = baseline_metrics.elapsed_compute().clone();
207 let _timer = elapsed_compute.timer();
208
209 let mut builder =
213 RecordBatchReceiverStream::builder(self.schema(), input_partitions);
214
215 for part_i in 0..input_partitions {
218 builder.run_input(
219 Arc::clone(&self.input),
220 part_i,
221 Arc::clone(&context),
222 );
223 }
224
225 let stream = builder.build();
226 Ok(Box::pin(ObservedStream::new(
227 stream,
228 baseline_metrics,
229 self.fetch,
230 )))
231 }
232 }
233 }
234
235 fn metrics(&self) -> Option<MetricsSet> {
236 Some(self.metrics.clone_inner())
237 }
238
239 fn partition_statistics(&self, _partition: Option<usize>) -> Result<Statistics> {
240 self.input
241 .partition_statistics(None)?
242 .with_fetch(self.fetch, 0, 1)
243 }
244
245 fn supports_limit_pushdown(&self) -> bool {
246 true
247 }
248
249 fn cardinality_effect(&self) -> CardinalityEffect {
250 CardinalityEffect::Equal
251 }
252
253 fn try_swapping_with_projection(
257 &self,
258 projection: &ProjectionExec,
259 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
260 if projection.expr().len() >= projection.input().schema().fields().len() {
262 return Ok(None);
263 }
264 make_with_child(projection, projection.input().children()[0]).map(|e| {
266 if self.fetch.is_some() {
267 let mut plan = CoalescePartitionsExec::new(e);
268 plan.fetch = self.fetch;
269 Some(Arc::new(plan) as _)
270 } else {
271 Some(Arc::new(CoalescePartitionsExec::new(e)) as _)
272 }
273 })
274 }
275
276 fn fetch(&self) -> Option<usize> {
277 self.fetch
278 }
279
280 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
281 Some(Arc::new(CoalescePartitionsExec {
282 input: Arc::clone(&self.input),
283 fetch: limit,
284 metrics: self.metrics.clone(),
285 cache: Arc::clone(&self.cache),
286 }))
287 }
288
289 fn with_preserve_order(
290 &self,
291 preserve_order: bool,
292 ) -> Option<Arc<dyn ExecutionPlan>> {
293 self.input
294 .with_preserve_order(preserve_order)
295 .and_then(|new_input| {
296 Arc::new(self.clone())
297 .with_new_children(vec![new_input])
298 .ok()
299 })
300 }
301
302 fn gather_filters_for_pushdown(
303 &self,
304 _phase: FilterPushdownPhase,
305 parent_filters: Vec<Arc<dyn PhysicalExpr>>,
306 _config: &ConfigOptions,
307 ) -> Result<FilterDescription> {
308 FilterDescription::from_children(parent_filters, &self.children())
309 }
310
311 fn try_pushdown_sort(
312 &self,
313 order: &[PhysicalSortExpr],
314 ) -> Result<SortOrderPushdownResult<Arc<dyn ExecutionPlan>>> {
315 let result = self.input.try_pushdown_sort(order)?;
323
324 let has_multiple_partitions =
327 self.input.output_partitioning().partition_count() > 1;
328
329 result
330 .try_map(|new_input| {
331 Ok(
332 Arc::new(
333 CoalescePartitionsExec::new(new_input).with_fetch(self.fetch),
334 ) as Arc<dyn ExecutionPlan>,
335 )
336 })
337 .map(|r| {
338 if has_multiple_partitions {
339 r.into_inexact()
341 } else {
342 r
343 }
344 })
345 }
346}
347
348#[cfg(test)]
349mod tests {
350 use super::*;
351 use crate::test::exec::{
352 BlockingExec, PanicExec, assert_strong_count_converges_to_zero,
353 };
354 use crate::test::{self, assert_is_pending};
355 use crate::{collect, common};
356
357 use arrow::datatypes::{DataType, Field, Schema};
358
359 use futures::FutureExt;
360
361 #[tokio::test]
362 async fn merge() -> Result<()> {
363 let task_ctx = Arc::new(TaskContext::default());
364
365 let num_partitions = 4;
366 let csv = test::scan_partitioned(num_partitions);
367
368 assert_eq!(csv.output_partitioning().partition_count(), num_partitions);
370
371 let merge = CoalescePartitionsExec::new(csv);
372
373 assert_eq!(
375 merge.properties().output_partitioning().partition_count(),
376 1
377 );
378
379 let iter = merge.execute(0, task_ctx)?;
381 let batches = common::collect(iter).await?;
382 assert_eq!(batches.len(), num_partitions);
383
384 let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
386 assert_eq!(row_count, 400);
387
388 Ok(())
389 }
390
391 #[tokio::test]
392 async fn test_drop_cancel() -> Result<()> {
393 let task_ctx = Arc::new(TaskContext::default());
394 let schema =
395 Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));
396
397 let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 2));
398 let refs = blocking_exec.refs();
399 let coalesce_partitions_exec =
400 Arc::new(CoalescePartitionsExec::new(blocking_exec));
401
402 let fut = collect(coalesce_partitions_exec, task_ctx);
403 let mut fut = fut.boxed();
404
405 assert_is_pending(&mut fut);
406 drop(fut);
407 assert_strong_count_converges_to_zero(refs).await;
408
409 Ok(())
410 }
411
412 #[tokio::test]
413 #[should_panic(expected = "PanickingStream did panic")]
414 async fn test_panic() {
415 let task_ctx = Arc::new(TaskContext::default());
416 let schema =
417 Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));
418
419 let panicking_exec = Arc::new(PanicExec::new(Arc::clone(&schema), 2));
420 let coalesce_partitions_exec =
421 Arc::new(CoalescePartitionsExec::new(panicking_exec));
422
423 collect(coalesce_partitions_exec, task_ctx).await.unwrap();
424 }
425
426 #[tokio::test]
427 async fn test_single_partition_with_fetch() -> Result<()> {
428 let task_ctx = Arc::new(TaskContext::default());
429
430 let input = test::scan_partitioned(1);
432
433 let coalesce = CoalescePartitionsExec::new(input).with_fetch(Some(3));
435
436 let stream = coalesce.execute(0, task_ctx)?;
437 let batches = common::collect(stream).await?;
438
439 let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
440 assert_eq!(row_count, 3, "Should only return 3 rows due to fetch=3");
441
442 Ok(())
443 }
444
445 #[tokio::test]
446 async fn test_multi_partition_with_fetch_one() -> Result<()> {
447 let task_ctx = Arc::new(TaskContext::default());
448
449 let input = test::scan_partitioned(4);
452
453 let coalesce = CoalescePartitionsExec::new(input).with_fetch(Some(1));
455
456 let stream = coalesce.execute(0, task_ctx)?;
457 let batches = common::collect(stream).await?;
458
459 let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
460 assert_eq!(
461 row_count, 1,
462 "Should only return 1 row due to fetch=1, not one per partition"
463 );
464
465 Ok(())
466 }
467
468 #[tokio::test]
469 async fn test_single_partition_without_fetch() -> Result<()> {
470 let task_ctx = Arc::new(TaskContext::default());
471
472 let input = test::scan_partitioned(1);
474
475 let coalesce = CoalescePartitionsExec::new(input);
477
478 let stream = coalesce.execute(0, task_ctx)?;
479 let batches = common::collect(stream).await?;
480
481 let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
482 assert_eq!(
483 row_count, 100,
484 "Should return all 100 rows when fetch is None"
485 );
486
487 Ok(())
488 }
489
490 #[tokio::test]
491 async fn test_single_partition_fetch_larger_than_batch() -> Result<()> {
492 let task_ctx = Arc::new(TaskContext::default());
493
494 let input = test::scan_partitioned(1);
496
497 let coalesce = CoalescePartitionsExec::new(input).with_fetch(Some(200));
499
500 let stream = coalesce.execute(0, task_ctx)?;
501 let batches = common::collect(stream).await?;
502
503 let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
504 assert_eq!(
505 row_count, 100,
506 "Should return all available rows (100) when fetch (200) is larger"
507 );
508
509 Ok(())
510 }
511
512 #[tokio::test]
513 async fn test_multi_partition_fetch_exact_match() -> Result<()> {
514 let task_ctx = Arc::new(TaskContext::default());
515
516 let num_partitions = 4;
518 let csv = test::scan_partitioned(num_partitions);
519
520 let coalesce = CoalescePartitionsExec::new(csv).with_fetch(Some(400));
522
523 let stream = coalesce.execute(0, task_ctx)?;
524 let batches = common::collect(stream).await?;
525
526 let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
527 assert_eq!(row_count, 400, "Should return exactly 400 rows");
528
529 Ok(())
530 }
531}