Skip to main content

datafusion_physical_plan/
coalesce_partitions.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Defines the merge plan for executing partitions in parallel and then merging the results
19//! into a single partition
20
21use std::any::Any;
22use std::sync::Arc;
23
24use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
25use super::stream::{ObservedStream, RecordBatchReceiverStream};
26use super::{
27    DisplayAs, ExecutionPlanProperties, PlanProperties, SendableRecordBatchStream,
28    Statistics,
29};
30use crate::execution_plan::{CardinalityEffect, EvaluationType, SchedulingType};
31use crate::filter_pushdown::{FilterDescription, FilterPushdownPhase};
32use crate::projection::{ProjectionExec, make_with_child};
33use crate::sort_pushdown::SortOrderPushdownResult;
34use crate::{DisplayFormatType, ExecutionPlan, Partitioning};
35use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
36
37use datafusion_common::config::ConfigOptions;
38use datafusion_common::{Result, assert_eq_or_internal_err, internal_err};
39use datafusion_execution::TaskContext;
40use datafusion_physical_expr::PhysicalExpr;
41
42/// Merge execution plan executes partitions in parallel and combines them into a single
43/// partition. No guarantees are made about the order of the resulting partition.
44#[derive(Debug, Clone)]
45pub struct CoalescePartitionsExec {
46    /// Input execution plan
47    input: Arc<dyn ExecutionPlan>,
48    /// Execution metrics
49    metrics: ExecutionPlanMetricsSet,
50    cache: PlanProperties,
51    /// Optional number of rows to fetch. Stops producing rows after this fetch
52    pub(crate) fetch: Option<usize>,
53}
54
55impl CoalescePartitionsExec {
56    /// Create a new CoalescePartitionsExec
57    pub fn new(input: Arc<dyn ExecutionPlan>) -> Self {
58        let cache = Self::compute_properties(&input);
59        CoalescePartitionsExec {
60            input,
61            metrics: ExecutionPlanMetricsSet::new(),
62            cache,
63            fetch: None,
64        }
65    }
66
67    /// Update fetch with the argument
68    pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
69        self.fetch = fetch;
70        self
71    }
72
73    /// Input execution plan
74    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
75        &self.input
76    }
77
78    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
79    fn compute_properties(input: &Arc<dyn ExecutionPlan>) -> PlanProperties {
80        let input_partitions = input.output_partitioning().partition_count();
81        let (drive, scheduling) = if input_partitions > 1 {
82            (EvaluationType::Eager, SchedulingType::Cooperative)
83        } else {
84            (
85                input.properties().evaluation_type,
86                input.properties().scheduling_type,
87            )
88        };
89
90        // Coalescing partitions loses existing orderings:
91        let mut eq_properties = input.equivalence_properties().clone();
92        eq_properties.clear_orderings();
93        eq_properties.clear_per_partition_constants();
94        PlanProperties::new(
95            eq_properties,                        // Equivalence Properties
96            Partitioning::UnknownPartitioning(1), // Output Partitioning
97            input.pipeline_behavior(),
98            input.boundedness(),
99        )
100        .with_evaluation_type(drive)
101        .with_scheduling_type(scheduling)
102    }
103}
104
105impl DisplayAs for CoalescePartitionsExec {
106    fn fmt_as(
107        &self,
108        t: DisplayFormatType,
109        f: &mut std::fmt::Formatter,
110    ) -> std::fmt::Result {
111        match t {
112            DisplayFormatType::Default | DisplayFormatType::Verbose => match self.fetch {
113                Some(fetch) => {
114                    write!(f, "CoalescePartitionsExec: fetch={fetch}")
115                }
116                None => write!(f, "CoalescePartitionsExec"),
117            },
118            DisplayFormatType::TreeRender => match self.fetch {
119                Some(fetch) => {
120                    write!(f, "limit: {fetch}")
121                }
122                None => write!(f, ""),
123            },
124        }
125    }
126}
127
128impl ExecutionPlan for CoalescePartitionsExec {
129    fn name(&self) -> &'static str {
130        "CoalescePartitionsExec"
131    }
132
133    /// Return a reference to Any that can be used for downcasting
134    fn as_any(&self) -> &dyn Any {
135        self
136    }
137
138    fn properties(&self) -> &PlanProperties {
139        &self.cache
140    }
141
142    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
143        vec![&self.input]
144    }
145
146    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
147        vec![false]
148    }
149
150    fn with_new_children(
151        self: Arc<Self>,
152        children: Vec<Arc<dyn ExecutionPlan>>,
153    ) -> Result<Arc<dyn ExecutionPlan>> {
154        let mut plan = CoalescePartitionsExec::new(Arc::clone(&children[0]));
155        plan.fetch = self.fetch;
156        Ok(Arc::new(plan))
157    }
158
159    fn execute(
160        &self,
161        partition: usize,
162        context: Arc<TaskContext>,
163    ) -> Result<SendableRecordBatchStream> {
164        // CoalescePartitionsExec produces a single partition
165        assert_eq_or_internal_err!(
166            partition,
167            0,
168            "CoalescePartitionsExec invalid partition {partition}"
169        );
170
171        let input_partitions = self.input.output_partitioning().partition_count();
172        match input_partitions {
173            0 => internal_err!(
174                "CoalescePartitionsExec requires at least one input partition"
175            ),
176            1 => {
177                // single-partition path: execute child directly, but ensure fetch is respected
178                // (wrap with ObservedStream only if fetch is present so we don't add overhead otherwise)
179                let child_stream = self.input.execute(0, context)?;
180                if self.fetch.is_some() {
181                    let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
182                    return Ok(Box::pin(ObservedStream::new(
183                        child_stream,
184                        baseline_metrics,
185                        self.fetch,
186                    )));
187                }
188                Ok(child_stream)
189            }
190            _ => {
191                let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
192                // record the (very) minimal work done so that
193                // elapsed_compute is not reported as 0
194                let elapsed_compute = baseline_metrics.elapsed_compute().clone();
195                let _timer = elapsed_compute.timer();
196
197                // use a stream that allows each sender to put in at
198                // least one result in an attempt to maximize
199                // parallelism.
200                let mut builder =
201                    RecordBatchReceiverStream::builder(self.schema(), input_partitions);
202
203                // spawn independent tasks whose resulting streams (of batches)
204                // are sent to the channel for consumption.
205                for part_i in 0..input_partitions {
206                    builder.run_input(
207                        Arc::clone(&self.input),
208                        part_i,
209                        Arc::clone(&context),
210                    );
211                }
212
213                let stream = builder.build();
214                Ok(Box::pin(ObservedStream::new(
215                    stream,
216                    baseline_metrics,
217                    self.fetch,
218                )))
219            }
220        }
221    }
222
223    fn metrics(&self) -> Option<MetricsSet> {
224        Some(self.metrics.clone_inner())
225    }
226
227    fn statistics(&self) -> Result<Statistics> {
228        self.partition_statistics(None)
229    }
230
231    fn partition_statistics(&self, _partition: Option<usize>) -> Result<Statistics> {
232        self.input
233            .partition_statistics(None)?
234            .with_fetch(self.fetch, 0, 1)
235    }
236
237    fn supports_limit_pushdown(&self) -> bool {
238        true
239    }
240
241    fn cardinality_effect(&self) -> CardinalityEffect {
242        CardinalityEffect::Equal
243    }
244
245    /// Tries to swap `projection` with its input, which is known to be a
246    /// [`CoalescePartitionsExec`]. If possible, performs the swap and returns
247    /// [`CoalescePartitionsExec`] as the top plan. Otherwise, returns `None`.
248    fn try_swapping_with_projection(
249        &self,
250        projection: &ProjectionExec,
251    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
252        // If the projection does not narrow the schema, we should not try to push it down:
253        if projection.expr().len() >= projection.input().schema().fields().len() {
254            return Ok(None);
255        }
256        // CoalescePartitionsExec always has a single child, so zero indexing is safe.
257        make_with_child(projection, projection.input().children()[0]).map(|e| {
258            if self.fetch.is_some() {
259                let mut plan = CoalescePartitionsExec::new(e);
260                plan.fetch = self.fetch;
261                Some(Arc::new(plan) as _)
262            } else {
263                Some(Arc::new(CoalescePartitionsExec::new(e)) as _)
264            }
265        })
266    }
267
268    fn fetch(&self) -> Option<usize> {
269        self.fetch
270    }
271
272    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
273        Some(Arc::new(CoalescePartitionsExec {
274            input: Arc::clone(&self.input),
275            fetch: limit,
276            metrics: self.metrics.clone(),
277            cache: self.cache.clone(),
278        }))
279    }
280
281    fn gather_filters_for_pushdown(
282        &self,
283        _phase: FilterPushdownPhase,
284        parent_filters: Vec<Arc<dyn PhysicalExpr>>,
285        _config: &ConfigOptions,
286    ) -> Result<FilterDescription> {
287        FilterDescription::from_children(parent_filters, &self.children())
288    }
289
290    fn try_pushdown_sort(
291        &self,
292        order: &[PhysicalSortExpr],
293    ) -> Result<SortOrderPushdownResult<Arc<dyn ExecutionPlan>>> {
294        // CoalescePartitionsExec merges multiple partitions into one, which loses
295        // global ordering. However, we can still push the sort requirement down
296        // to optimize individual partitions - the Sort operator above will handle
297        // the global ordering.
298        //
299        // Note: The result will always be at most Inexact (never Exact) when there
300        // are multiple partitions, because merging destroys global ordering.
301        let result = self.input.try_pushdown_sort(order)?;
302
303        // If we have multiple partitions, we can't return Exact even if the
304        // underlying source claims Exact - merging destroys global ordering
305        let has_multiple_partitions =
306            self.input.output_partitioning().partition_count() > 1;
307
308        result
309            .try_map(|new_input| {
310                Ok(
311                    Arc::new(
312                        CoalescePartitionsExec::new(new_input).with_fetch(self.fetch),
313                    ) as Arc<dyn ExecutionPlan>,
314                )
315            })
316            .map(|r| {
317                if has_multiple_partitions {
318                    // Downgrade Exact to Inexact when merging multiple partitions
319                    r.into_inexact()
320                } else {
321                    r
322                }
323            })
324    }
325}
326
327#[cfg(test)]
328mod tests {
329    use super::*;
330    use crate::test::exec::{
331        BlockingExec, PanicExec, assert_strong_count_converges_to_zero,
332    };
333    use crate::test::{self, assert_is_pending};
334    use crate::{collect, common};
335
336    use arrow::datatypes::{DataType, Field, Schema};
337
338    use futures::FutureExt;
339
340    #[tokio::test]
341    async fn merge() -> Result<()> {
342        let task_ctx = Arc::new(TaskContext::default());
343
344        let num_partitions = 4;
345        let csv = test::scan_partitioned(num_partitions);
346
347        // input should have 4 partitions
348        assert_eq!(csv.output_partitioning().partition_count(), num_partitions);
349
350        let merge = CoalescePartitionsExec::new(csv);
351
352        // output of CoalescePartitionsExec should have a single partition
353        assert_eq!(
354            merge.properties().output_partitioning().partition_count(),
355            1
356        );
357
358        // the result should contain 4 batches (one per input partition)
359        let iter = merge.execute(0, task_ctx)?;
360        let batches = common::collect(iter).await?;
361        assert_eq!(batches.len(), num_partitions);
362
363        // there should be a total of 400 rows (100 per each partition)
364        let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
365        assert_eq!(row_count, 400);
366
367        Ok(())
368    }
369
370    #[tokio::test]
371    async fn test_drop_cancel() -> Result<()> {
372        let task_ctx = Arc::new(TaskContext::default());
373        let schema =
374            Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));
375
376        let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 2));
377        let refs = blocking_exec.refs();
378        let coalesce_partitions_exec =
379            Arc::new(CoalescePartitionsExec::new(blocking_exec));
380
381        let fut = collect(coalesce_partitions_exec, task_ctx);
382        let mut fut = fut.boxed();
383
384        assert_is_pending(&mut fut);
385        drop(fut);
386        assert_strong_count_converges_to_zero(refs).await;
387
388        Ok(())
389    }
390
391    #[tokio::test]
392    #[should_panic(expected = "PanickingStream did panic")]
393    async fn test_panic() {
394        let task_ctx = Arc::new(TaskContext::default());
395        let schema =
396            Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));
397
398        let panicking_exec = Arc::new(PanicExec::new(Arc::clone(&schema), 2));
399        let coalesce_partitions_exec =
400            Arc::new(CoalescePartitionsExec::new(panicking_exec));
401
402        collect(coalesce_partitions_exec, task_ctx).await.unwrap();
403    }
404
405    #[tokio::test]
406    async fn test_single_partition_with_fetch() -> Result<()> {
407        let task_ctx = Arc::new(TaskContext::default());
408
409        // Use existing scan_partitioned with 1 partition (returns 100 rows per partition)
410        let input = test::scan_partitioned(1);
411
412        // Test with fetch=3
413        let coalesce = CoalescePartitionsExec::new(input).with_fetch(Some(3));
414
415        let stream = coalesce.execute(0, task_ctx)?;
416        let batches = common::collect(stream).await?;
417
418        let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
419        assert_eq!(row_count, 3, "Should only return 3 rows due to fetch=3");
420
421        Ok(())
422    }
423
424    #[tokio::test]
425    async fn test_multi_partition_with_fetch_one() -> Result<()> {
426        let task_ctx = Arc::new(TaskContext::default());
427
428        // Create 4 partitions, each with 100 rows
429        // This simulates the real-world scenario where each partition has data
430        let input = test::scan_partitioned(4);
431
432        // Test with fetch=1 (the original bug: was returning multiple rows instead of 1)
433        let coalesce = CoalescePartitionsExec::new(input).with_fetch(Some(1));
434
435        let stream = coalesce.execute(0, task_ctx)?;
436        let batches = common::collect(stream).await?;
437
438        let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
439        assert_eq!(
440            row_count, 1,
441            "Should only return 1 row due to fetch=1, not one per partition"
442        );
443
444        Ok(())
445    }
446
447    #[tokio::test]
448    async fn test_single_partition_without_fetch() -> Result<()> {
449        let task_ctx = Arc::new(TaskContext::default());
450
451        // Use scan_partitioned with 1 partition
452        let input = test::scan_partitioned(1);
453
454        // Test without fetch (should return all rows)
455        let coalesce = CoalescePartitionsExec::new(input);
456
457        let stream = coalesce.execute(0, task_ctx)?;
458        let batches = common::collect(stream).await?;
459
460        let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
461        assert_eq!(
462            row_count, 100,
463            "Should return all 100 rows when fetch is None"
464        );
465
466        Ok(())
467    }
468
469    #[tokio::test]
470    async fn test_single_partition_fetch_larger_than_batch() -> Result<()> {
471        let task_ctx = Arc::new(TaskContext::default());
472
473        // Use scan_partitioned with 1 partition (returns 100 rows)
474        let input = test::scan_partitioned(1);
475
476        // Test with fetch larger than available rows
477        let coalesce = CoalescePartitionsExec::new(input).with_fetch(Some(200));
478
479        let stream = coalesce.execute(0, task_ctx)?;
480        let batches = common::collect(stream).await?;
481
482        let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
483        assert_eq!(
484            row_count, 100,
485            "Should return all available rows (100) when fetch (200) is larger"
486        );
487
488        Ok(())
489    }
490
491    #[tokio::test]
492    async fn test_multi_partition_fetch_exact_match() -> Result<()> {
493        let task_ctx = Arc::new(TaskContext::default());
494
495        // Create 4 partitions, each with 100 rows
496        let num_partitions = 4;
497        let csv = test::scan_partitioned(num_partitions);
498
499        // Test with fetch=400 (exactly all rows)
500        let coalesce = CoalescePartitionsExec::new(csv).with_fetch(Some(400));
501
502        let stream = coalesce.execute(0, task_ctx)?;
503        let batches = common::collect(stream).await?;
504
505        let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
506        assert_eq!(row_count, 400, "Should return exactly 400 rows");
507
508        Ok(())
509    }
510}