Skip to main content

datafusion_physical_plan/
coalesce_partitions.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Defines the merge plan for executing partitions in parallel and then merging the results
19//! into a single partition
20
21use std::any::Any;
22use std::sync::Arc;
23
24use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
25use super::stream::{ObservedStream, RecordBatchReceiverStream};
26use super::{
27    DisplayAs, ExecutionPlanProperties, PlanProperties, SendableRecordBatchStream,
28    Statistics,
29};
30use crate::execution_plan::{CardinalityEffect, EvaluationType, SchedulingType};
31use crate::filter_pushdown::{FilterDescription, FilterPushdownPhase};
32use crate::projection::{ProjectionExec, make_with_child};
33use crate::sort_pushdown::SortOrderPushdownResult;
34use crate::{DisplayFormatType, ExecutionPlan, Partitioning, check_if_same_properties};
35use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
36
37use datafusion_common::config::ConfigOptions;
38use datafusion_common::{Result, assert_eq_or_internal_err, internal_err};
39use datafusion_execution::TaskContext;
40use datafusion_physical_expr::PhysicalExpr;
41
42/// Merge execution plan executes partitions in parallel and combines them into a single
43/// partition. No guarantees are made about the order of the resulting partition.
44#[derive(Debug, Clone)]
45pub struct CoalescePartitionsExec {
46    /// Input execution plan
47    input: Arc<dyn ExecutionPlan>,
48    /// Execution metrics
49    metrics: ExecutionPlanMetricsSet,
50    cache: Arc<PlanProperties>,
51    /// Optional number of rows to fetch. Stops producing rows after this fetch
52    pub(crate) fetch: Option<usize>,
53}
54
55impl CoalescePartitionsExec {
56    /// Create a new CoalescePartitionsExec
57    pub fn new(input: Arc<dyn ExecutionPlan>) -> Self {
58        let cache = Self::compute_properties(&input);
59        CoalescePartitionsExec {
60            input,
61            metrics: ExecutionPlanMetricsSet::new(),
62            cache: Arc::new(cache),
63            fetch: None,
64        }
65    }
66
67    /// Update fetch with the argument
68    pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
69        self.fetch = fetch;
70        self
71    }
72
73    /// Input execution plan
74    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
75        &self.input
76    }
77
78    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
79    fn compute_properties(input: &Arc<dyn ExecutionPlan>) -> PlanProperties {
80        let input_partitions = input.output_partitioning().partition_count();
81        let (drive, scheduling) = if input_partitions > 1 {
82            (EvaluationType::Eager, SchedulingType::Cooperative)
83        } else {
84            (
85                input.properties().evaluation_type,
86                input.properties().scheduling_type,
87            )
88        };
89
90        // Coalescing partitions loses existing orderings:
91        let mut eq_properties = input.equivalence_properties().clone();
92        eq_properties.clear_orderings();
93        eq_properties.clear_per_partition_constants();
94        PlanProperties::new(
95            eq_properties,                        // Equivalence Properties
96            Partitioning::UnknownPartitioning(1), // Output Partitioning
97            input.pipeline_behavior(),
98            input.boundedness(),
99        )
100        .with_evaluation_type(drive)
101        .with_scheduling_type(scheduling)
102    }
103
104    fn with_new_children_and_same_properties(
105        &self,
106        mut children: Vec<Arc<dyn ExecutionPlan>>,
107    ) -> Self {
108        Self {
109            input: children.swap_remove(0),
110            metrics: ExecutionPlanMetricsSet::new(),
111            ..Self::clone(self)
112        }
113    }
114}
115
116impl DisplayAs for CoalescePartitionsExec {
117    fn fmt_as(
118        &self,
119        t: DisplayFormatType,
120        f: &mut std::fmt::Formatter,
121    ) -> std::fmt::Result {
122        match t {
123            DisplayFormatType::Default | DisplayFormatType::Verbose => match self.fetch {
124                Some(fetch) => {
125                    write!(f, "CoalescePartitionsExec: fetch={fetch}")
126                }
127                None => write!(f, "CoalescePartitionsExec"),
128            },
129            DisplayFormatType::TreeRender => match self.fetch {
130                Some(fetch) => {
131                    write!(f, "limit: {fetch}")
132                }
133                None => write!(f, ""),
134            },
135        }
136    }
137}
138
139impl ExecutionPlan for CoalescePartitionsExec {
140    fn name(&self) -> &'static str {
141        "CoalescePartitionsExec"
142    }
143
144    /// Return a reference to Any that can be used for downcasting
145    fn as_any(&self) -> &dyn Any {
146        self
147    }
148
149    fn properties(&self) -> &Arc<PlanProperties> {
150        &self.cache
151    }
152
153    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
154        vec![&self.input]
155    }
156
157    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
158        vec![false]
159    }
160
161    fn with_new_children(
162        self: Arc<Self>,
163        mut children: Vec<Arc<dyn ExecutionPlan>>,
164    ) -> Result<Arc<dyn ExecutionPlan>> {
165        check_if_same_properties!(self, children);
166        let mut plan = CoalescePartitionsExec::new(children.swap_remove(0));
167        plan.fetch = self.fetch;
168        Ok(Arc::new(plan))
169    }
170
171    fn execute(
172        &self,
173        partition: usize,
174        context: Arc<TaskContext>,
175    ) -> Result<SendableRecordBatchStream> {
176        // CoalescePartitionsExec produces a single partition
177        assert_eq_or_internal_err!(
178            partition,
179            0,
180            "CoalescePartitionsExec invalid partition {partition}"
181        );
182
183        let input_partitions = self.input.output_partitioning().partition_count();
184        match input_partitions {
185            0 => internal_err!(
186                "CoalescePartitionsExec requires at least one input partition"
187            ),
188            1 => {
189                // single-partition path: execute child directly, but ensure fetch is respected
190                // (wrap with ObservedStream only if fetch is present so we don't add overhead otherwise)
191                let child_stream = self.input.execute(0, context)?;
192                if self.fetch.is_some() {
193                    let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
194                    return Ok(Box::pin(ObservedStream::new(
195                        child_stream,
196                        baseline_metrics,
197                        self.fetch,
198                    )));
199                }
200                Ok(child_stream)
201            }
202            _ => {
203                let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
204                // record the (very) minimal work done so that
205                // elapsed_compute is not reported as 0
206                let elapsed_compute = baseline_metrics.elapsed_compute().clone();
207                let _timer = elapsed_compute.timer();
208
209                // use a stream that allows each sender to put in at
210                // least one result in an attempt to maximize
211                // parallelism.
212                let mut builder =
213                    RecordBatchReceiverStream::builder(self.schema(), input_partitions);
214
215                // spawn independent tasks whose resulting streams (of batches)
216                // are sent to the channel for consumption.
217                for part_i in 0..input_partitions {
218                    builder.run_input(
219                        Arc::clone(&self.input),
220                        part_i,
221                        Arc::clone(&context),
222                    );
223                }
224
225                let stream = builder.build();
226                Ok(Box::pin(ObservedStream::new(
227                    stream,
228                    baseline_metrics,
229                    self.fetch,
230                )))
231            }
232        }
233    }
234
235    fn metrics(&self) -> Option<MetricsSet> {
236        Some(self.metrics.clone_inner())
237    }
238
239    fn partition_statistics(&self, _partition: Option<usize>) -> Result<Statistics> {
240        self.input
241            .partition_statistics(None)?
242            .with_fetch(self.fetch, 0, 1)
243    }
244
245    fn supports_limit_pushdown(&self) -> bool {
246        true
247    }
248
249    fn cardinality_effect(&self) -> CardinalityEffect {
250        CardinalityEffect::Equal
251    }
252
253    /// Tries to swap `projection` with its input, which is known to be a
254    /// [`CoalescePartitionsExec`]. If possible, performs the swap and returns
255    /// [`CoalescePartitionsExec`] as the top plan. Otherwise, returns `None`.
256    fn try_swapping_with_projection(
257        &self,
258        projection: &ProjectionExec,
259    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
260        // If the projection does not narrow the schema, we should not try to push it down:
261        if projection.expr().len() >= projection.input().schema().fields().len() {
262            return Ok(None);
263        }
264        // CoalescePartitionsExec always has a single child, so zero indexing is safe.
265        make_with_child(projection, projection.input().children()[0]).map(|e| {
266            if self.fetch.is_some() {
267                let mut plan = CoalescePartitionsExec::new(e);
268                plan.fetch = self.fetch;
269                Some(Arc::new(plan) as _)
270            } else {
271                Some(Arc::new(CoalescePartitionsExec::new(e)) as _)
272            }
273        })
274    }
275
276    fn fetch(&self) -> Option<usize> {
277        self.fetch
278    }
279
280    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
281        Some(Arc::new(CoalescePartitionsExec {
282            input: Arc::clone(&self.input),
283            fetch: limit,
284            metrics: self.metrics.clone(),
285            cache: Arc::clone(&self.cache),
286        }))
287    }
288
289    fn with_preserve_order(
290        &self,
291        preserve_order: bool,
292    ) -> Option<Arc<dyn ExecutionPlan>> {
293        self.input
294            .with_preserve_order(preserve_order)
295            .and_then(|new_input| {
296                Arc::new(self.clone())
297                    .with_new_children(vec![new_input])
298                    .ok()
299            })
300    }
301
302    fn gather_filters_for_pushdown(
303        &self,
304        _phase: FilterPushdownPhase,
305        parent_filters: Vec<Arc<dyn PhysicalExpr>>,
306        _config: &ConfigOptions,
307    ) -> Result<FilterDescription> {
308        FilterDescription::from_children(parent_filters, &self.children())
309    }
310
311    fn try_pushdown_sort(
312        &self,
313        order: &[PhysicalSortExpr],
314    ) -> Result<SortOrderPushdownResult<Arc<dyn ExecutionPlan>>> {
315        // CoalescePartitionsExec merges multiple partitions into one, which loses
316        // global ordering. However, we can still push the sort requirement down
317        // to optimize individual partitions - the Sort operator above will handle
318        // the global ordering.
319        //
320        // Note: The result will always be at most Inexact (never Exact) when there
321        // are multiple partitions, because merging destroys global ordering.
322        let result = self.input.try_pushdown_sort(order)?;
323
324        // If we have multiple partitions, we can't return Exact even if the
325        // underlying source claims Exact - merging destroys global ordering
326        let has_multiple_partitions =
327            self.input.output_partitioning().partition_count() > 1;
328
329        result
330            .try_map(|new_input| {
331                Ok(
332                    Arc::new(
333                        CoalescePartitionsExec::new(new_input).with_fetch(self.fetch),
334                    ) as Arc<dyn ExecutionPlan>,
335                )
336            })
337            .map(|r| {
338                if has_multiple_partitions {
339                    // Downgrade Exact to Inexact when merging multiple partitions
340                    r.into_inexact()
341                } else {
342                    r
343                }
344            })
345    }
346}
347
348#[cfg(test)]
349mod tests {
350    use super::*;
351    use crate::test::exec::{
352        BlockingExec, PanicExec, assert_strong_count_converges_to_zero,
353    };
354    use crate::test::{self, assert_is_pending};
355    use crate::{collect, common};
356
357    use arrow::datatypes::{DataType, Field, Schema};
358
359    use futures::FutureExt;
360
361    #[tokio::test]
362    async fn merge() -> Result<()> {
363        let task_ctx = Arc::new(TaskContext::default());
364
365        let num_partitions = 4;
366        let csv = test::scan_partitioned(num_partitions);
367
368        // input should have 4 partitions
369        assert_eq!(csv.output_partitioning().partition_count(), num_partitions);
370
371        let merge = CoalescePartitionsExec::new(csv);
372
373        // output of CoalescePartitionsExec should have a single partition
374        assert_eq!(
375            merge.properties().output_partitioning().partition_count(),
376            1
377        );
378
379        // the result should contain 4 batches (one per input partition)
380        let iter = merge.execute(0, task_ctx)?;
381        let batches = common::collect(iter).await?;
382        assert_eq!(batches.len(), num_partitions);
383
384        // there should be a total of 400 rows (100 per each partition)
385        let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
386        assert_eq!(row_count, 400);
387
388        Ok(())
389    }
390
391    #[tokio::test]
392    async fn test_drop_cancel() -> Result<()> {
393        let task_ctx = Arc::new(TaskContext::default());
394        let schema =
395            Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));
396
397        let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 2));
398        let refs = blocking_exec.refs();
399        let coalesce_partitions_exec =
400            Arc::new(CoalescePartitionsExec::new(blocking_exec));
401
402        let fut = collect(coalesce_partitions_exec, task_ctx);
403        let mut fut = fut.boxed();
404
405        assert_is_pending(&mut fut);
406        drop(fut);
407        assert_strong_count_converges_to_zero(refs).await;
408
409        Ok(())
410    }
411
412    #[tokio::test]
413    #[should_panic(expected = "PanickingStream did panic")]
414    async fn test_panic() {
415        let task_ctx = Arc::new(TaskContext::default());
416        let schema =
417            Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));
418
419        let panicking_exec = Arc::new(PanicExec::new(Arc::clone(&schema), 2));
420        let coalesce_partitions_exec =
421            Arc::new(CoalescePartitionsExec::new(panicking_exec));
422
423        collect(coalesce_partitions_exec, task_ctx).await.unwrap();
424    }
425
426    #[tokio::test]
427    async fn test_single_partition_with_fetch() -> Result<()> {
428        let task_ctx = Arc::new(TaskContext::default());
429
430        // Use existing scan_partitioned with 1 partition (returns 100 rows per partition)
431        let input = test::scan_partitioned(1);
432
433        // Test with fetch=3
434        let coalesce = CoalescePartitionsExec::new(input).with_fetch(Some(3));
435
436        let stream = coalesce.execute(0, task_ctx)?;
437        let batches = common::collect(stream).await?;
438
439        let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
440        assert_eq!(row_count, 3, "Should only return 3 rows due to fetch=3");
441
442        Ok(())
443    }
444
445    #[tokio::test]
446    async fn test_multi_partition_with_fetch_one() -> Result<()> {
447        let task_ctx = Arc::new(TaskContext::default());
448
449        // Create 4 partitions, each with 100 rows
450        // This simulates the real-world scenario where each partition has data
451        let input = test::scan_partitioned(4);
452
453        // Test with fetch=1 (the original bug: was returning multiple rows instead of 1)
454        let coalesce = CoalescePartitionsExec::new(input).with_fetch(Some(1));
455
456        let stream = coalesce.execute(0, task_ctx)?;
457        let batches = common::collect(stream).await?;
458
459        let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
460        assert_eq!(
461            row_count, 1,
462            "Should only return 1 row due to fetch=1, not one per partition"
463        );
464
465        Ok(())
466    }
467
468    #[tokio::test]
469    async fn test_single_partition_without_fetch() -> Result<()> {
470        let task_ctx = Arc::new(TaskContext::default());
471
472        // Use scan_partitioned with 1 partition
473        let input = test::scan_partitioned(1);
474
475        // Test without fetch (should return all rows)
476        let coalesce = CoalescePartitionsExec::new(input);
477
478        let stream = coalesce.execute(0, task_ctx)?;
479        let batches = common::collect(stream).await?;
480
481        let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
482        assert_eq!(
483            row_count, 100,
484            "Should return all 100 rows when fetch is None"
485        );
486
487        Ok(())
488    }
489
490    #[tokio::test]
491    async fn test_single_partition_fetch_larger_than_batch() -> Result<()> {
492        let task_ctx = Arc::new(TaskContext::default());
493
494        // Use scan_partitioned with 1 partition (returns 100 rows)
495        let input = test::scan_partitioned(1);
496
497        // Test with fetch larger than available rows
498        let coalesce = CoalescePartitionsExec::new(input).with_fetch(Some(200));
499
500        let stream = coalesce.execute(0, task_ctx)?;
501        let batches = common::collect(stream).await?;
502
503        let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
504        assert_eq!(
505            row_count, 100,
506            "Should return all available rows (100) when fetch (200) is larger"
507        );
508
509        Ok(())
510    }
511
512    #[tokio::test]
513    async fn test_multi_partition_fetch_exact_match() -> Result<()> {
514        let task_ctx = Arc::new(TaskContext::default());
515
516        // Create 4 partitions, each with 100 rows
517        let num_partitions = 4;
518        let csv = test::scan_partitioned(num_partitions);
519
520        // Test with fetch=400 (exactly all rows)
521        let coalesce = CoalescePartitionsExec::new(csv).with_fetch(Some(400));
522
523        let stream = coalesce.execute(0, task_ctx)?;
524        let batches = common::collect(stream).await?;
525
526        let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
527        assert_eq!(row_count, 400, "Should return exactly 400 rows");
528
529        Ok(())
530    }
531}