Skip to main content

datafusion_physical_plan/
coalesce_partitions.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Defines the merge plan for executing partitions in parallel and then merging the results
19//! into a single partition
20
21use std::sync::Arc;
22
23use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
24use super::stream::{ObservedStream, RecordBatchReceiverStream};
25use super::{
26    DisplayAs, ExecutionPlanProperties, PlanProperties, SendableRecordBatchStream,
27    Statistics,
28};
29use crate::execution_plan::{CardinalityEffect, EvaluationType, SchedulingType};
30use crate::filter_pushdown::{FilterDescription, FilterPushdownPhase};
31use crate::projection::{ProjectionExec, make_with_child};
32use crate::sort_pushdown::SortOrderPushdownResult;
33use crate::{DisplayFormatType, ExecutionPlan, Partitioning, check_if_same_properties};
34use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
35
36use datafusion_common::config::ConfigOptions;
37use datafusion_common::{Result, assert_eq_or_internal_err, internal_err};
38use datafusion_execution::TaskContext;
39use datafusion_physical_expr::PhysicalExpr;
40
41/// Merge execution plan executes partitions in parallel and combines them into a single
42/// partition. No guarantees are made about the order of the resulting partition.
43#[derive(Debug, Clone)]
44pub struct CoalescePartitionsExec {
45    /// Input execution plan
46    input: Arc<dyn ExecutionPlan>,
47    /// Execution metrics
48    metrics: ExecutionPlanMetricsSet,
49    cache: Arc<PlanProperties>,
50    /// Optional number of rows to fetch. Stops producing rows after this fetch
51    pub(crate) fetch: Option<usize>,
52}
53
54impl CoalescePartitionsExec {
55    /// Create a new CoalescePartitionsExec
56    pub fn new(input: Arc<dyn ExecutionPlan>) -> Self {
57        let cache = Self::compute_properties(&input);
58        CoalescePartitionsExec {
59            input,
60            metrics: ExecutionPlanMetricsSet::new(),
61            cache: Arc::new(cache),
62            fetch: None,
63        }
64    }
65
66    /// Update fetch with the argument
67    pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
68        self.fetch = fetch;
69        self
70    }
71
72    /// Input execution plan
73    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
74        &self.input
75    }
76
77    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
78    fn compute_properties(input: &Arc<dyn ExecutionPlan>) -> PlanProperties {
79        let input_partitions = input.output_partitioning().partition_count();
80        let (drive, scheduling) = if input_partitions > 1 {
81            (EvaluationType::Eager, SchedulingType::Cooperative)
82        } else {
83            (
84                input.properties().evaluation_type,
85                input.properties().scheduling_type,
86            )
87        };
88
89        // Coalescing partitions loses existing orderings:
90        let mut eq_properties = input.equivalence_properties().clone();
91        eq_properties.clear_orderings();
92        eq_properties.clear_per_partition_constants();
93        PlanProperties::new(
94            eq_properties,                        // Equivalence Properties
95            Partitioning::UnknownPartitioning(1), // Output Partitioning
96            input.pipeline_behavior(),
97            input.boundedness(),
98        )
99        .with_evaluation_type(drive)
100        .with_scheduling_type(scheduling)
101    }
102
103    fn with_new_children_and_same_properties(
104        &self,
105        mut children: Vec<Arc<dyn ExecutionPlan>>,
106    ) -> Self {
107        Self {
108            input: children.swap_remove(0),
109            metrics: ExecutionPlanMetricsSet::new(),
110            ..Self::clone(self)
111        }
112    }
113}
114
115impl DisplayAs for CoalescePartitionsExec {
116    fn fmt_as(
117        &self,
118        t: DisplayFormatType,
119        f: &mut std::fmt::Formatter,
120    ) -> std::fmt::Result {
121        match t {
122            DisplayFormatType::Default | DisplayFormatType::Verbose => match self.fetch {
123                Some(fetch) => {
124                    write!(f, "CoalescePartitionsExec: fetch={fetch}")
125                }
126                None => write!(f, "CoalescePartitionsExec"),
127            },
128            DisplayFormatType::TreeRender => match self.fetch {
129                Some(fetch) => {
130                    write!(f, "limit: {fetch}")
131                }
132                None => write!(f, ""),
133            },
134        }
135    }
136}
137
138impl ExecutionPlan for CoalescePartitionsExec {
139    fn name(&self) -> &'static str {
140        "CoalescePartitionsExec"
141    }
142
143    /// Return a reference to Any that can be used for downcasting
144    fn properties(&self) -> &Arc<PlanProperties> {
145        &self.cache
146    }
147
148    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
149        vec![&self.input]
150    }
151
152    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
153        vec![false]
154    }
155
156    fn with_new_children(
157        self: Arc<Self>,
158        mut children: Vec<Arc<dyn ExecutionPlan>>,
159    ) -> Result<Arc<dyn ExecutionPlan>> {
160        check_if_same_properties!(self, children);
161        let mut plan = CoalescePartitionsExec::new(children.swap_remove(0));
162        plan.fetch = self.fetch;
163        Ok(Arc::new(plan))
164    }
165
166    fn execute(
167        &self,
168        partition: usize,
169        context: Arc<TaskContext>,
170    ) -> Result<SendableRecordBatchStream> {
171        // CoalescePartitionsExec produces a single partition
172        assert_eq_or_internal_err!(
173            partition,
174            0,
175            "CoalescePartitionsExec invalid partition {partition}"
176        );
177
178        let input_partitions = self.input.output_partitioning().partition_count();
179        match input_partitions {
180            0 => internal_err!(
181                "CoalescePartitionsExec requires at least one input partition"
182            ),
183            1 => {
184                // single-partition path: execute child directly, but ensure fetch is respected
185                // (wrap with ObservedStream only if fetch is present so we don't add overhead otherwise)
186                let child_stream = self.input.execute(0, context)?;
187                if self.fetch.is_some() {
188                    let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
189                    return Ok(Box::pin(ObservedStream::new(
190                        child_stream,
191                        baseline_metrics,
192                        self.fetch,
193                    )));
194                }
195                Ok(child_stream)
196            }
197            _ => {
198                let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
199                // record the (very) minimal work done so that
200                // elapsed_compute is not reported as 0
201                let elapsed_compute = baseline_metrics.elapsed_compute().clone();
202                let _timer = elapsed_compute.timer();
203
204                // use a stream that allows each sender to put in at
205                // least one result in an attempt to maximize
206                // parallelism.
207                let mut builder =
208                    RecordBatchReceiverStream::builder(self.schema(), input_partitions);
209
210                // spawn independent tasks whose resulting streams (of batches)
211                // are sent to the channel for consumption.
212                for part_i in 0..input_partitions {
213                    builder.run_input(
214                        Arc::clone(&self.input),
215                        part_i,
216                        Arc::clone(&context),
217                    );
218                }
219
220                let stream = builder.build();
221                Ok(Box::pin(ObservedStream::new(
222                    stream,
223                    baseline_metrics,
224                    self.fetch,
225                )))
226            }
227        }
228    }
229
230    fn metrics(&self) -> Option<MetricsSet> {
231        Some(self.metrics.clone_inner())
232    }
233
234    fn partition_statistics(&self, _partition: Option<usize>) -> Result<Arc<Statistics>> {
235        let stats = Arc::unwrap_or_clone(self.input.partition_statistics(None)?);
236        Ok(Arc::new(stats.with_fetch(self.fetch, 0, 1)?))
237    }
238
239    fn supports_limit_pushdown(&self) -> bool {
240        true
241    }
242
243    fn cardinality_effect(&self) -> CardinalityEffect {
244        CardinalityEffect::Equal
245    }
246
247    /// Tries to swap `projection` with its input, which is known to be a
248    /// [`CoalescePartitionsExec`]. If possible, performs the swap and returns
249    /// [`CoalescePartitionsExec`] as the top plan. Otherwise, returns `None`.
250    fn try_swapping_with_projection(
251        &self,
252        projection: &ProjectionExec,
253    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
254        // If the projection does not narrow the schema, we should not try to push it down:
255        if projection.expr().len() >= projection.input().schema().fields().len() {
256            return Ok(None);
257        }
258        // CoalescePartitionsExec always has a single child, so zero indexing is safe.
259        make_with_child(projection, projection.input().children()[0]).map(|e| {
260            if self.fetch.is_some() {
261                let mut plan = CoalescePartitionsExec::new(e);
262                plan.fetch = self.fetch;
263                Some(Arc::new(plan) as _)
264            } else {
265                Some(Arc::new(CoalescePartitionsExec::new(e)) as _)
266            }
267        })
268    }
269
270    fn fetch(&self) -> Option<usize> {
271        self.fetch
272    }
273
274    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
275        Some(Arc::new(CoalescePartitionsExec {
276            input: Arc::clone(&self.input),
277            fetch: limit,
278            metrics: self.metrics.clone(),
279            cache: Arc::clone(&self.cache),
280        }))
281    }
282
283    fn with_preserve_order(
284        &self,
285        preserve_order: bool,
286    ) -> Option<Arc<dyn ExecutionPlan>> {
287        self.input
288            .with_preserve_order(preserve_order)
289            .and_then(|new_input| {
290                Arc::new(self.clone())
291                    .with_new_children(vec![new_input])
292                    .ok()
293            })
294    }
295
296    fn gather_filters_for_pushdown(
297        &self,
298        _phase: FilterPushdownPhase,
299        parent_filters: Vec<Arc<dyn PhysicalExpr>>,
300        _config: &ConfigOptions,
301    ) -> Result<FilterDescription> {
302        FilterDescription::from_children(parent_filters, &self.children())
303    }
304
305    fn try_pushdown_sort(
306        &self,
307        order: &[PhysicalSortExpr],
308    ) -> Result<SortOrderPushdownResult<Arc<dyn ExecutionPlan>>> {
309        // CoalescePartitionsExec merges multiple partitions into one, which loses
310        // global ordering. However, we can still push the sort requirement down
311        // to optimize individual partitions - the Sort operator above will handle
312        // the global ordering.
313        //
314        // Note: The result will always be at most Inexact (never Exact) when there
315        // are multiple partitions, because merging destroys global ordering.
316        let result = self.input.try_pushdown_sort(order)?;
317
318        // If we have multiple partitions, we can't return Exact even if the
319        // underlying source claims Exact - merging destroys global ordering
320        let has_multiple_partitions =
321            self.input.output_partitioning().partition_count() > 1;
322
323        result
324            .try_map(|new_input| {
325                Ok(
326                    Arc::new(
327                        CoalescePartitionsExec::new(new_input).with_fetch(self.fetch),
328                    ) as Arc<dyn ExecutionPlan>,
329                )
330            })
331            .map(|r| {
332                if has_multiple_partitions {
333                    // Downgrade Exact to Inexact when merging multiple partitions
334                    r.into_inexact()
335                } else {
336                    r
337                }
338            })
339    }
340}
341
342#[cfg(test)]
343mod tests {
344    use super::*;
345    use crate::test::exec::{
346        BarrierExec, BlockingExec, PanicExec, assert_strong_count_converges_to_zero,
347    };
348    use crate::test::{self, assert_is_pending};
349    use crate::{collect, common};
350
351    use std::time::Duration;
352
353    use arrow::array::RecordBatch;
354    use arrow::datatypes::{DataType, Field, Schema};
355
356    use futures::FutureExt;
357
358    #[tokio::test]
359    async fn merge() -> Result<()> {
360        let task_ctx = Arc::new(TaskContext::default());
361
362        let num_partitions = 4;
363        let csv = test::scan_partitioned(num_partitions);
364
365        // input should have 4 partitions
366        assert_eq!(csv.output_partitioning().partition_count(), num_partitions);
367
368        let merge = CoalescePartitionsExec::new(csv);
369
370        // output of CoalescePartitionsExec should have a single partition
371        assert_eq!(
372            merge.properties().output_partitioning().partition_count(),
373            1
374        );
375
376        // the result should contain 4 batches (one per input partition)
377        let iter = merge.execute(0, task_ctx)?;
378        let batches = common::collect(iter).await?;
379        assert_eq!(batches.len(), num_partitions);
380
381        // there should be a total of 400 rows (100 per each partition)
382        let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
383        assert_eq!(row_count, 400);
384
385        Ok(())
386    }
387
388    #[tokio::test]
389    async fn drops_input_plan_after_input_streams_start() -> Result<()> {
390        let task_ctx = Arc::new(TaskContext::default());
391        let schema =
392            Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));
393        let input_partitions = 2;
394        let batch = RecordBatch::new_empty(Arc::clone(&schema));
395        let input = Arc::new(
396            BarrierExec::new(vec![vec![batch]; input_partitions], schema)
397                .without_start_barrier()
398                .with_finish_barrier()
399                .with_log(false),
400        );
401        let refs = Arc::downgrade(&input);
402
403        let input_plan: Arc<BarrierExec> = Arc::clone(&input);
404        let coalesce = CoalescePartitionsExec::new(input_plan);
405        let stream = coalesce.execute(0, task_ctx)?;
406        drop(coalesce);
407
408        tokio::time::timeout(Duration::from_secs(5), async {
409            // Why not `wait_finish` here: that releases the barrier which lets the input tasks
410            // finish, which drops the input Arcs and hides the bug.
411            while !input.is_finish_barrier_reached() {
412                tokio::task::yield_now().await;
413            }
414        })
415        .await
416        .expect("input streams should reach pending");
417
418        drop(input);
419
420        assert_strong_count_converges_to_zero(refs).await;
421
422        drop(stream);
423
424        Ok(())
425    }
426
427    #[tokio::test]
428    async fn test_drop_cancel() -> Result<()> {
429        let task_ctx = Arc::new(TaskContext::default());
430        let schema =
431            Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));
432
433        let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 2));
434        let refs = blocking_exec.refs();
435        let coalesce_partitions_exec =
436            Arc::new(CoalescePartitionsExec::new(blocking_exec));
437
438        let fut = collect(coalesce_partitions_exec, task_ctx);
439        let mut fut = fut.boxed();
440
441        assert_is_pending(&mut fut);
442        drop(fut);
443        assert_strong_count_converges_to_zero(refs).await;
444
445        Ok(())
446    }
447
448    #[tokio::test]
449    #[should_panic(expected = "PanickingStream did panic")]
450    async fn test_panic() {
451        let task_ctx = Arc::new(TaskContext::default());
452        let schema =
453            Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));
454
455        let panicking_exec = Arc::new(PanicExec::new(Arc::clone(&schema), 2));
456        let coalesce_partitions_exec =
457            Arc::new(CoalescePartitionsExec::new(panicking_exec));
458
459        collect(coalesce_partitions_exec, task_ctx).await.unwrap();
460    }
461
462    #[tokio::test]
463    async fn test_single_partition_with_fetch() -> Result<()> {
464        let task_ctx = Arc::new(TaskContext::default());
465
466        // Use existing scan_partitioned with 1 partition (returns 100 rows per partition)
467        let input = test::scan_partitioned(1);
468
469        // Test with fetch=3
470        let coalesce = CoalescePartitionsExec::new(input).with_fetch(Some(3));
471
472        let stream = coalesce.execute(0, task_ctx)?;
473        let batches = common::collect(stream).await?;
474
475        let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
476        assert_eq!(row_count, 3, "Should only return 3 rows due to fetch=3");
477
478        Ok(())
479    }
480
481    #[tokio::test]
482    async fn test_multi_partition_with_fetch_one() -> Result<()> {
483        let task_ctx = Arc::new(TaskContext::default());
484
485        // Create 4 partitions, each with 100 rows
486        // This simulates the real-world scenario where each partition has data
487        let input = test::scan_partitioned(4);
488
489        // Test with fetch=1 (the original bug: was returning multiple rows instead of 1)
490        let coalesce = CoalescePartitionsExec::new(input).with_fetch(Some(1));
491
492        let stream = coalesce.execute(0, task_ctx)?;
493        let batches = common::collect(stream).await?;
494
495        let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
496        assert_eq!(
497            row_count, 1,
498            "Should only return 1 row due to fetch=1, not one per partition"
499        );
500
501        Ok(())
502    }
503
504    #[tokio::test]
505    async fn test_single_partition_without_fetch() -> Result<()> {
506        let task_ctx = Arc::new(TaskContext::default());
507
508        // Use scan_partitioned with 1 partition
509        let input = test::scan_partitioned(1);
510
511        // Test without fetch (should return all rows)
512        let coalesce = CoalescePartitionsExec::new(input);
513
514        let stream = coalesce.execute(0, task_ctx)?;
515        let batches = common::collect(stream).await?;
516
517        let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
518        assert_eq!(
519            row_count, 100,
520            "Should return all 100 rows when fetch is None"
521        );
522
523        Ok(())
524    }
525
526    #[tokio::test]
527    async fn test_single_partition_fetch_larger_than_batch() -> Result<()> {
528        let task_ctx = Arc::new(TaskContext::default());
529
530        // Use scan_partitioned with 1 partition (returns 100 rows)
531        let input = test::scan_partitioned(1);
532
533        // Test with fetch larger than available rows
534        let coalesce = CoalescePartitionsExec::new(input).with_fetch(Some(200));
535
536        let stream = coalesce.execute(0, task_ctx)?;
537        let batches = common::collect(stream).await?;
538
539        let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
540        assert_eq!(
541            row_count, 100,
542            "Should return all available rows (100) when fetch (200) is larger"
543        );
544
545        Ok(())
546    }
547
548    #[tokio::test]
549    async fn test_multi_partition_fetch_exact_match() -> Result<()> {
550        let task_ctx = Arc::new(TaskContext::default());
551
552        // Create 4 partitions, each with 100 rows
553        let num_partitions = 4;
554        let csv = test::scan_partitioned(num_partitions);
555
556        // Test with fetch=400 (exactly all rows)
557        let coalesce = CoalescePartitionsExec::new(csv).with_fetch(Some(400));
558
559        let stream = coalesce.execute(0, task_ctx)?;
560        let batches = common::collect(stream).await?;
561
562        let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
563        assert_eq!(row_count, 400, "Should return exactly 400 rows");
564
565        Ok(())
566    }
567}