datafusion_physical_plan/
coalesce_partitions.rs1use std::sync::Arc;
22
23use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
24use super::stream::{ObservedStream, RecordBatchReceiverStream};
25use super::{
26 DisplayAs, ExecutionPlanProperties, PlanProperties, SendableRecordBatchStream,
27 Statistics,
28};
29use crate::execution_plan::{CardinalityEffect, EvaluationType, SchedulingType};
30use crate::filter_pushdown::{FilterDescription, FilterPushdownPhase};
31use crate::projection::{ProjectionExec, make_with_child};
32use crate::sort_pushdown::SortOrderPushdownResult;
33use crate::{DisplayFormatType, ExecutionPlan, Partitioning, check_if_same_properties};
34use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
35
36use datafusion_common::config::ConfigOptions;
37use datafusion_common::{Result, assert_eq_or_internal_err, internal_err};
38use datafusion_execution::TaskContext;
39use datafusion_physical_expr::PhysicalExpr;
40
41#[derive(Debug, Clone)]
44pub struct CoalescePartitionsExec {
45 input: Arc<dyn ExecutionPlan>,
47 metrics: ExecutionPlanMetricsSet,
49 cache: Arc<PlanProperties>,
50 pub(crate) fetch: Option<usize>,
52}
53
54impl CoalescePartitionsExec {
55 pub fn new(input: Arc<dyn ExecutionPlan>) -> Self {
57 let cache = Self::compute_properties(&input);
58 CoalescePartitionsExec {
59 input,
60 metrics: ExecutionPlanMetricsSet::new(),
61 cache: Arc::new(cache),
62 fetch: None,
63 }
64 }
65
66 pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
68 self.fetch = fetch;
69 self
70 }
71
72 pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
74 &self.input
75 }
76
77 fn compute_properties(input: &Arc<dyn ExecutionPlan>) -> PlanProperties {
79 let input_partitions = input.output_partitioning().partition_count();
80 let (drive, scheduling) = if input_partitions > 1 {
81 (EvaluationType::Eager, SchedulingType::Cooperative)
82 } else {
83 (
84 input.properties().evaluation_type,
85 input.properties().scheduling_type,
86 )
87 };
88
89 let mut eq_properties = input.equivalence_properties().clone();
91 eq_properties.clear_orderings();
92 eq_properties.clear_per_partition_constants();
93 PlanProperties::new(
94 eq_properties, Partitioning::UnknownPartitioning(1), input.pipeline_behavior(),
97 input.boundedness(),
98 )
99 .with_evaluation_type(drive)
100 .with_scheduling_type(scheduling)
101 }
102
103 fn with_new_children_and_same_properties(
104 &self,
105 mut children: Vec<Arc<dyn ExecutionPlan>>,
106 ) -> Self {
107 Self {
108 input: children.swap_remove(0),
109 metrics: ExecutionPlanMetricsSet::new(),
110 ..Self::clone(self)
111 }
112 }
113}
114
115impl DisplayAs for CoalescePartitionsExec {
116 fn fmt_as(
117 &self,
118 t: DisplayFormatType,
119 f: &mut std::fmt::Formatter,
120 ) -> std::fmt::Result {
121 match t {
122 DisplayFormatType::Default | DisplayFormatType::Verbose => match self.fetch {
123 Some(fetch) => {
124 write!(f, "CoalescePartitionsExec: fetch={fetch}")
125 }
126 None => write!(f, "CoalescePartitionsExec"),
127 },
128 DisplayFormatType::TreeRender => match self.fetch {
129 Some(fetch) => {
130 write!(f, "limit: {fetch}")
131 }
132 None => write!(f, ""),
133 },
134 }
135 }
136}
137
138impl ExecutionPlan for CoalescePartitionsExec {
139 fn name(&self) -> &'static str {
140 "CoalescePartitionsExec"
141 }
142
143 fn properties(&self) -> &Arc<PlanProperties> {
145 &self.cache
146 }
147
148 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
149 vec![&self.input]
150 }
151
152 fn benefits_from_input_partitioning(&self) -> Vec<bool> {
153 vec![false]
154 }
155
156 fn with_new_children(
157 self: Arc<Self>,
158 mut children: Vec<Arc<dyn ExecutionPlan>>,
159 ) -> Result<Arc<dyn ExecutionPlan>> {
160 check_if_same_properties!(self, children);
161 let mut plan = CoalescePartitionsExec::new(children.swap_remove(0));
162 plan.fetch = self.fetch;
163 Ok(Arc::new(plan))
164 }
165
166 fn execute(
167 &self,
168 partition: usize,
169 context: Arc<TaskContext>,
170 ) -> Result<SendableRecordBatchStream> {
171 assert_eq_or_internal_err!(
173 partition,
174 0,
175 "CoalescePartitionsExec invalid partition {partition}"
176 );
177
178 let input_partitions = self.input.output_partitioning().partition_count();
179 match input_partitions {
180 0 => internal_err!(
181 "CoalescePartitionsExec requires at least one input partition"
182 ),
183 1 => {
184 let child_stream = self.input.execute(0, context)?;
187 if self.fetch.is_some() {
188 let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
189 return Ok(Box::pin(ObservedStream::new(
190 child_stream,
191 baseline_metrics,
192 self.fetch,
193 )));
194 }
195 Ok(child_stream)
196 }
197 _ => {
198 let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
199 let elapsed_compute = baseline_metrics.elapsed_compute().clone();
202 let _timer = elapsed_compute.timer();
203
204 let mut builder =
208 RecordBatchReceiverStream::builder(self.schema(), input_partitions);
209
210 for part_i in 0..input_partitions {
213 builder.run_input(
214 Arc::clone(&self.input),
215 part_i,
216 Arc::clone(&context),
217 );
218 }
219
220 let stream = builder.build();
221 Ok(Box::pin(ObservedStream::new(
222 stream,
223 baseline_metrics,
224 self.fetch,
225 )))
226 }
227 }
228 }
229
230 fn metrics(&self) -> Option<MetricsSet> {
231 Some(self.metrics.clone_inner())
232 }
233
234 fn partition_statistics(&self, _partition: Option<usize>) -> Result<Arc<Statistics>> {
235 let stats = Arc::unwrap_or_clone(self.input.partition_statistics(None)?);
236 Ok(Arc::new(stats.with_fetch(self.fetch, 0, 1)?))
237 }
238
239 fn supports_limit_pushdown(&self) -> bool {
240 true
241 }
242
243 fn cardinality_effect(&self) -> CardinalityEffect {
244 CardinalityEffect::Equal
245 }
246
247 fn try_swapping_with_projection(
251 &self,
252 projection: &ProjectionExec,
253 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
254 if projection.expr().len() >= projection.input().schema().fields().len() {
256 return Ok(None);
257 }
258 make_with_child(projection, projection.input().children()[0]).map(|e| {
260 if self.fetch.is_some() {
261 let mut plan = CoalescePartitionsExec::new(e);
262 plan.fetch = self.fetch;
263 Some(Arc::new(plan) as _)
264 } else {
265 Some(Arc::new(CoalescePartitionsExec::new(e)) as _)
266 }
267 })
268 }
269
270 fn fetch(&self) -> Option<usize> {
271 self.fetch
272 }
273
274 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
275 Some(Arc::new(CoalescePartitionsExec {
276 input: Arc::clone(&self.input),
277 fetch: limit,
278 metrics: self.metrics.clone(),
279 cache: Arc::clone(&self.cache),
280 }))
281 }
282
283 fn with_preserve_order(
284 &self,
285 preserve_order: bool,
286 ) -> Option<Arc<dyn ExecutionPlan>> {
287 self.input
288 .with_preserve_order(preserve_order)
289 .and_then(|new_input| {
290 Arc::new(self.clone())
291 .with_new_children(vec![new_input])
292 .ok()
293 })
294 }
295
296 fn gather_filters_for_pushdown(
297 &self,
298 _phase: FilterPushdownPhase,
299 parent_filters: Vec<Arc<dyn PhysicalExpr>>,
300 _config: &ConfigOptions,
301 ) -> Result<FilterDescription> {
302 FilterDescription::from_children(parent_filters, &self.children())
303 }
304
305 fn try_pushdown_sort(
306 &self,
307 order: &[PhysicalSortExpr],
308 ) -> Result<SortOrderPushdownResult<Arc<dyn ExecutionPlan>>> {
309 let result = self.input.try_pushdown_sort(order)?;
317
318 let has_multiple_partitions =
321 self.input.output_partitioning().partition_count() > 1;
322
323 result
324 .try_map(|new_input| {
325 Ok(
326 Arc::new(
327 CoalescePartitionsExec::new(new_input).with_fetch(self.fetch),
328 ) as Arc<dyn ExecutionPlan>,
329 )
330 })
331 .map(|r| {
332 if has_multiple_partitions {
333 r.into_inexact()
335 } else {
336 r
337 }
338 })
339 }
340}
341
342#[cfg(test)]
343mod tests {
344 use super::*;
345 use crate::test::exec::{
346 BarrierExec, BlockingExec, PanicExec, assert_strong_count_converges_to_zero,
347 };
348 use crate::test::{self, assert_is_pending};
349 use crate::{collect, common};
350
351 use std::time::Duration;
352
353 use arrow::array::RecordBatch;
354 use arrow::datatypes::{DataType, Field, Schema};
355
356 use futures::FutureExt;
357
358 #[tokio::test]
359 async fn merge() -> Result<()> {
360 let task_ctx = Arc::new(TaskContext::default());
361
362 let num_partitions = 4;
363 let csv = test::scan_partitioned(num_partitions);
364
365 assert_eq!(csv.output_partitioning().partition_count(), num_partitions);
367
368 let merge = CoalescePartitionsExec::new(csv);
369
370 assert_eq!(
372 merge.properties().output_partitioning().partition_count(),
373 1
374 );
375
376 let iter = merge.execute(0, task_ctx)?;
378 let batches = common::collect(iter).await?;
379 assert_eq!(batches.len(), num_partitions);
380
381 let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
383 assert_eq!(row_count, 400);
384
385 Ok(())
386 }
387
388 #[tokio::test]
389 async fn drops_input_plan_after_input_streams_start() -> Result<()> {
390 let task_ctx = Arc::new(TaskContext::default());
391 let schema =
392 Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));
393 let input_partitions = 2;
394 let batch = RecordBatch::new_empty(Arc::clone(&schema));
395 let input = Arc::new(
396 BarrierExec::new(vec![vec![batch]; input_partitions], schema)
397 .without_start_barrier()
398 .with_finish_barrier()
399 .with_log(false),
400 );
401 let refs = Arc::downgrade(&input);
402
403 let input_plan: Arc<BarrierExec> = Arc::clone(&input);
404 let coalesce = CoalescePartitionsExec::new(input_plan);
405 let stream = coalesce.execute(0, task_ctx)?;
406 drop(coalesce);
407
408 tokio::time::timeout(Duration::from_secs(5), async {
409 while !input.is_finish_barrier_reached() {
412 tokio::task::yield_now().await;
413 }
414 })
415 .await
416 .expect("input streams should reach pending");
417
418 drop(input);
419
420 assert_strong_count_converges_to_zero(refs).await;
421
422 drop(stream);
423
424 Ok(())
425 }
426
427 #[tokio::test]
428 async fn test_drop_cancel() -> Result<()> {
429 let task_ctx = Arc::new(TaskContext::default());
430 let schema =
431 Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));
432
433 let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 2));
434 let refs = blocking_exec.refs();
435 let coalesce_partitions_exec =
436 Arc::new(CoalescePartitionsExec::new(blocking_exec));
437
438 let fut = collect(coalesce_partitions_exec, task_ctx);
439 let mut fut = fut.boxed();
440
441 assert_is_pending(&mut fut);
442 drop(fut);
443 assert_strong_count_converges_to_zero(refs).await;
444
445 Ok(())
446 }
447
448 #[tokio::test]
449 #[should_panic(expected = "PanickingStream did panic")]
450 async fn test_panic() {
451 let task_ctx = Arc::new(TaskContext::default());
452 let schema =
453 Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));
454
455 let panicking_exec = Arc::new(PanicExec::new(Arc::clone(&schema), 2));
456 let coalesce_partitions_exec =
457 Arc::new(CoalescePartitionsExec::new(panicking_exec));
458
459 collect(coalesce_partitions_exec, task_ctx).await.unwrap();
460 }
461
462 #[tokio::test]
463 async fn test_single_partition_with_fetch() -> Result<()> {
464 let task_ctx = Arc::new(TaskContext::default());
465
466 let input = test::scan_partitioned(1);
468
469 let coalesce = CoalescePartitionsExec::new(input).with_fetch(Some(3));
471
472 let stream = coalesce.execute(0, task_ctx)?;
473 let batches = common::collect(stream).await?;
474
475 let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
476 assert_eq!(row_count, 3, "Should only return 3 rows due to fetch=3");
477
478 Ok(())
479 }
480
481 #[tokio::test]
482 async fn test_multi_partition_with_fetch_one() -> Result<()> {
483 let task_ctx = Arc::new(TaskContext::default());
484
485 let input = test::scan_partitioned(4);
488
489 let coalesce = CoalescePartitionsExec::new(input).with_fetch(Some(1));
491
492 let stream = coalesce.execute(0, task_ctx)?;
493 let batches = common::collect(stream).await?;
494
495 let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
496 assert_eq!(
497 row_count, 1,
498 "Should only return 1 row due to fetch=1, not one per partition"
499 );
500
501 Ok(())
502 }
503
504 #[tokio::test]
505 async fn test_single_partition_without_fetch() -> Result<()> {
506 let task_ctx = Arc::new(TaskContext::default());
507
508 let input = test::scan_partitioned(1);
510
511 let coalesce = CoalescePartitionsExec::new(input);
513
514 let stream = coalesce.execute(0, task_ctx)?;
515 let batches = common::collect(stream).await?;
516
517 let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
518 assert_eq!(
519 row_count, 100,
520 "Should return all 100 rows when fetch is None"
521 );
522
523 Ok(())
524 }
525
526 #[tokio::test]
527 async fn test_single_partition_fetch_larger_than_batch() -> Result<()> {
528 let task_ctx = Arc::new(TaskContext::default());
529
530 let input = test::scan_partitioned(1);
532
533 let coalesce = CoalescePartitionsExec::new(input).with_fetch(Some(200));
535
536 let stream = coalesce.execute(0, task_ctx)?;
537 let batches = common::collect(stream).await?;
538
539 let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
540 assert_eq!(
541 row_count, 100,
542 "Should return all available rows (100) when fetch (200) is larger"
543 );
544
545 Ok(())
546 }
547
548 #[tokio::test]
549 async fn test_multi_partition_fetch_exact_match() -> Result<()> {
550 let task_ctx = Arc::new(TaskContext::default());
551
552 let num_partitions = 4;
554 let csv = test::scan_partitioned(num_partitions);
555
556 let coalesce = CoalescePartitionsExec::new(csv).with_fetch(Some(400));
558
559 let stream = coalesce.execute(0, task_ctx)?;
560 let batches = common::collect(stream).await?;
561
562 let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
563 assert_eq!(row_count, 400, "Should return exactly 400 rows");
564
565 Ok(())
566 }
567}